RocMarshal commented on code in PR #2: URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1442712519
########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java: ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.datasource.statements; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of record. + */ +@PublicEvolving +public interface JdbcQueryStatement<T> extends Serializable { + String query(); + + void map(PreparedStatement ps, T data) throws SQLException; Review Comment: - `data` -> `record` - The method described by the interface here is more like filling the data of a message into a preparedstatement. So how about - changing method name like `JdbcStatementFiller` or `JdbcStatementRender` - changing the interface name to `fill` or `render` ? ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java: ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.datasource.statements; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of record. Review Comment: How about ``` * Sets {@link PreparedStatement} parameters that's used in JDBC Sink based on a specified type of the record. ``` ? ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java: ########## @@ -0,0 +1,271 @@ +package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +import static org.apache.flink.util.StringUtils.byteToHexString; + +/** A simple {@link Xid} implementation. */ +public class TransactionId implements Xid, Serializable { + + private static final long serialVersionUID = 1L; + + private static final int FORMAT_ID = 202; + + private final byte[] jobId; + private final int subtaskId; + private final int numberOfSubtasks; + private final long checkpointId; + private final int attempts; + private final boolean restored; + + private TransactionId( + byte[] jobId, + int subtaskId, + int numberOfSubtasks, + long checkpointId, + int attempts, + boolean restored) { + this.jobId = jobId; + this.subtaskId = subtaskId; + this.numberOfSubtasks = numberOfSubtasks; + this.checkpointId = checkpointId; + this.attempts = attempts; + this.restored = restored; + } + + public static TransactionId empty() { + return create(new byte[JobID.SIZE], 0, 0); + } + + public static TransactionId create(byte[] jobId, int subtaskId, int numberOfSubtasks) { + return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, false); + } + + public static TransactionId restore( + byte[] jobId, int subtaskId, int numberOfSubtasks, long checkpointId, int attempts) { + return new TransactionId(jobId, subtaskId, numberOfSubtasks, checkpointId, attempts, true); + } + + public static TransactionId createFromXid( + int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException { + return fromXid(formatId, globalTransactionId, branchQualifier, false); + } + + public static TransactionId restoreFromXid( + int formatId, byte[] globalTransactionId, byte[] branchQualifier) throws IOException { + return fromXid(formatId, globalTransactionId, branchQualifier, true); + } + + public static TransactionId fromXid( + int formatId, byte[] globalTransactionId, byte[] branchQualifier, boolean restored) + throws IOException { + if (FORMAT_ID != formatId) { + throw new IOException(String.format("Xid formatId (%s) is not valid", formatId)); + } + + final DataInputDeserializer gid = new DataInputDeserializer(globalTransactionId); + byte[] jobIdBytes = readJobId(gid); + int subtaskId = gid.readInt(); + + final DataInputDeserializer branch = new DataInputDeserializer(branchQualifier); + int numberOfSubtasks = branch.readInt(); + long checkpoint = branch.readLong(); + + return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, 0, restored); + } + + public static TransactionId deserialize(byte[] bytes) { + try { + final DataInputDeserializer in = new DataInputDeserializer(bytes); + byte[] jobIdBytes = readJobId(in); + int subtaskId = in.readInt(); + int numberOfSubtasks = in.readInt(); + long checkpoint = in.readLong(); + int attempts = in.readInt(); + return restore(jobIdBytes, subtaskId, numberOfSubtasks, checkpoint, attempts); + } catch (IOException e) { + throw new FlinkRuntimeException(e.getLocalizedMessage()); Review Comment: why not use `FlinkRuntimeException(e)` ? ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java: ########## @@ -0,0 +1,76 @@ +package org.apache.flink.connector.jdbc.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** {@link JdbcWriterState} serializer. */ +@Internal +public class JdbcWriterStateSerializer implements SimpleVersionedSerializer<JdbcWriterState> { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcWriterStateSerializer.class); + + @Override + public int getVersion() { + return 2; + } + + @Override + public byte[] serialize(JdbcWriterState state) throws IOException { + final DataOutputSerializer out = new DataOutputSerializer(1); + out.writeInt(state.getHanging().size()); + for (TransactionId tid : state.getHanging()) { + byte[] tIdBytes = tid.serialize(); + out.writeByte(tIdBytes.length); + out.write(tIdBytes, 0, tIdBytes.length); + } + out.writeInt(state.getPrepared().size()); + for (TransactionId tid : state.getPrepared()) { + byte[] tIdBytes = tid.serialize(); + out.writeByte(tIdBytes.length); + out.write(tIdBytes, 0, tIdBytes.length); + } + return out.getSharedBuffer(); + } + + @Override + public JdbcWriterState deserialize(int version, byte[] serialized) throws IOException { + final DataInputDeserializer in = new DataInputDeserializer(serialized); + + if (version == getVersion()) { + return deserializeV2(in); + } + + LOG.error("Unknown version of state: " + version); Review Comment: ```suggestion LOG.error("Unknown version of state: {}", version); ``` ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java: ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider; +import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement; +import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction; +import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId; +import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat; +import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** @param <IN> */ Review Comment: It seems that the comments here are not complete? ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java: ########## @@ -107,5 +108,9 @@ public static <T> SinkFunction<T> exactlyOnceSink( exactlyOnceOptions); } + public static <IN> JdbcSinkBuilder<IN> newSink() { Review Comment: how about ``` public static <IN> JdbcSinkBuilder<IN> newSinkBuilder() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org