RocMarshal commented on code in PR #2:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1442712519


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.datasource.statements;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a 
specific type of record.
+ */
+@PublicEvolving
+public interface JdbcQueryStatement<T> extends Serializable {
+    String query();
+
+    void map(PreparedStatement ps, T data) throws SQLException;

Review Comment:
   - `data` -> `record`
   - The method described by the interface here is more like filling the data 
of a message into a preparedstatement. So how about    
        - changing method name like `JdbcStatementFiller` or 
`JdbcStatementRender`
        - changing the interface name to `fill` or `render`     
   ?



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.datasource.statements;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a 
specific type of record.

Review Comment:
   How about 
   
   ```
    * Sets {@link PreparedStatement} parameters that's used in JDBC Sink based 
on a specified type of the record.
   ```
   
   ?



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,271 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int 
numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, 
false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long 
checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, 
checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) 
throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) 
throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, 
boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not 
valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new 
DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new 
DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, 
checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, 
checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   why not use `FlinkRuntimeException(e)` ?



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java:
##########
@@ -0,0 +1,76 @@
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link JdbcWriterState} serializer. */
+@Internal
+public class JdbcWriterStateSerializer implements 
SimpleVersionedSerializer<JdbcWriterState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcWriterStateSerializer.class);
+
+    @Override
+    public int getVersion() {
+        return 2;
+    }
+
+    @Override
+    public byte[] serialize(JdbcWriterState state) throws IOException {
+        final DataOutputSerializer out = new DataOutputSerializer(1);
+        out.writeInt(state.getHanging().size());
+        for (TransactionId tid : state.getHanging()) {
+            byte[] tIdBytes = tid.serialize();
+            out.writeByte(tIdBytes.length);
+            out.write(tIdBytes, 0, tIdBytes.length);
+        }
+        out.writeInt(state.getPrepared().size());
+        for (TransactionId tid : state.getPrepared()) {
+            byte[] tIdBytes = tid.serialize();
+            out.writeByte(tIdBytes.length);
+            out.write(tIdBytes, 0, tIdBytes.length);
+        }
+        return out.getSharedBuffer();
+    }
+
+    @Override
+    public JdbcWriterState deserialize(int version, byte[] serialized) throws 
IOException {
+        final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+        if (version == getVersion()) {
+            return deserializeV2(in);
+        }
+
+        LOG.error("Unknown version of state: " + version);

Review Comment:
   ```suggestion
           LOG.error("Unknown version of state: {}", version);
   ```



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
+import 
org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
+import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction;
+import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** @param <IN> */

Review Comment:
   It seems that the comments here are not complete?



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java:
##########
@@ -107,5 +108,9 @@ public static <T> SinkFunction<T> exactlyOnceSink(
                 exactlyOnceOptions);
     }
 
+    public static <IN> JdbcSinkBuilder<IN> newSink() {

Review Comment:
   how about 
   ```
   public static <IN> JdbcSinkBuilder<IN> newSinkBuilder() {
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to