This is an automated email from the ASF dual-hosted git repository. mgreber pushed a commit to branch flink-replication in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 72cf988e57e133127aa43e1ecdaffba1f356522e Author: Marton Greber <[email protected]> AuthorDate: Tue Mar 25 18:04:56 2025 +0100 back out the schema names in the operation mapper --- .../CustomAbstractSingleOperationMapper.java | 65 ++++++++++++++++++++++ .../kudu/replication/ReplicationJobExecutor.java | 10 +++- 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java new file mode 100644 index 000000000..01681b0de --- /dev/null +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java @@ -0,0 +1,65 @@ +package org.apache.kudu.replication; + +import org.apache.flink.connector.kudu.connector.writer.KuduOperationMapper; +import org.apache.flink.types.Row; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class CustomAbstractSingleOperationMapper implements KuduOperationMapper<Row> { + + private final KuduOperation operation; + + + public CustomAbstractSingleOperationMapper(KuduOperation operation) { + this.operation = operation; + } + + public Optional<Operation> createBaseOperation(Row input, KuduTable table) { + if (operation == null) { + throw new UnsupportedOperationException( + "createBaseOperation must be overridden if no operation specified in constructor"); + } + switch (operation) { + case INSERT: + return Optional.of(table.newInsert()); + case UPDATE: + return Optional.of(table.newUpdate()); + case UPSERT: + return Optional.of(table.newUpsert()); + case DELETE: + return Optional.of(table.newDelete()); + default: + throw new RuntimeException("Unknown operation " + operation); + } + } + + @Override + public List<Operation> createOperations(Row input, KuduTable table) { + Optional<Operation> operationOpt = createBaseOperation(input, table); + if (!operationOpt.isPresent()) { + return Collections.emptyList(); + } + + Operation operation = operationOpt.get(); + PartialRow partialRow = operation.getRow(); + + for (int i = 0; i < input.getArity(); i++) { + partialRow.addObject(i, input.getField(i)); + } + + return Collections.singletonList(operation); + } + + /** Kudu operation types. */ + public enum KuduOperation { + INSERT, + UPDATE, + UPSERT, + DELETE + } +} \ No newline at end of file diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java index d04bfe22b..d9d3c7a2f 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java @@ -33,6 +33,7 @@ import org.apache.flink.connector.kudu.connector.reader.KuduReader; import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig; import org.apache.flink.connector.kudu.connector.reader.KuduReaderIterator; import org.apache.flink.connector.kudu.connector.writer.AbstractSingleOperationMapper; +import org.apache.flink.connector.kudu.connector.writer.KuduOperationMapper; import org.apache.flink.connector.kudu.connector.writer.KuduWriterConfig; import org.apache.flink.connector.kudu.connector.writer.RowDataUpsertOperationMapper; import org.apache.flink.connector.kudu.connector.writer.RowOperationMapper; @@ -50,6 +51,9 @@ import org.apache.flink.types.Row; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Type; import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; import org.apache.kudu.shaded.com.google.common.collect.Lists; import java.util.*; @@ -100,8 +104,8 @@ class ReplicationJobExecutor { // TODO: figure out the mechanism to get this auto populated // for now just got the column names hardcoed ~.~ - private RowOperationMapper getRowOperationMapper() { - return new RowOperationMapper(new String[]{"key", "column1_i", "column2_i", "column3_s", "column4_b"}, AbstractSingleOperationMapper.KuduOperation.UPSERT); + private KuduOperationMapper getRowOperationMapper() { + return new CustomAbstractSingleOperationMapper(CustomAbstractSingleOperationMapper.KuduOperation.UPSERT); } @@ -116,3 +120,5 @@ class ReplicationJobExecutor { } } + +
