This is an automated email from the ASF dual-hosted git repository.

mgreber pushed a commit to branch flink-replication
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 72cf988e57e133127aa43e1ecdaffba1f356522e
Author: Marton Greber <[email protected]>
AuthorDate: Tue Mar 25 18:04:56 2025 +0100

    back out the schema names in the operation mapper
---
 .../CustomAbstractSingleOperationMapper.java       | 65 ++++++++++++++++++++++
 .../kudu/replication/ReplicationJobExecutor.java   | 10 +++-
 2 files changed, 73 insertions(+), 2 deletions(-)

diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java
new file mode 100644
index 000000000..01681b0de
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java
@@ -0,0 +1,65 @@
+package org.apache.kudu.replication;
+
+import org.apache.flink.connector.kudu.connector.writer.KuduOperationMapper;
+import org.apache.flink.types.Row;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CustomAbstractSingleOperationMapper implements 
KuduOperationMapper<Row> {
+
+    private final KuduOperation operation;
+
+
+    public CustomAbstractSingleOperationMapper(KuduOperation operation) {
+        this.operation = operation;
+    }
+
+    public Optional<Operation> createBaseOperation(Row input, KuduTable table) 
{
+        if (operation == null) {
+            throw new UnsupportedOperationException(
+                    "createBaseOperation must be overridden if no operation 
specified in constructor");
+        }
+        switch (operation) {
+            case INSERT:
+                return Optional.of(table.newInsert());
+            case UPDATE:
+                return Optional.of(table.newUpdate());
+            case UPSERT:
+                return Optional.of(table.newUpsert());
+            case DELETE:
+                return Optional.of(table.newDelete());
+            default:
+                throw new RuntimeException("Unknown operation " + operation);
+        }
+    }
+
+    @Override
+    public List<Operation> createOperations(Row input, KuduTable table) {
+        Optional<Operation> operationOpt = createBaseOperation(input, table);
+        if (!operationOpt.isPresent()) {
+            return Collections.emptyList();
+        }
+
+        Operation operation = operationOpt.get();
+        PartialRow partialRow = operation.getRow();
+
+        for (int i = 0; i < input.getArity(); i++) {
+            partialRow.addObject(i, input.getField(i));
+        }
+
+        return Collections.singletonList(operation);
+    }
+
+    /** Kudu operation types. */
+    public enum KuduOperation {
+        INSERT,
+        UPDATE,
+        UPSERT,
+        DELETE
+    }
+}
\ No newline at end of file
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
index d04bfe22b..d9d3c7a2f 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.connector.kudu.connector.reader.KuduReader;
 import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
 import org.apache.flink.connector.kudu.connector.reader.KuduReaderIterator;
 import 
org.apache.flink.connector.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connector.kudu.connector.writer.KuduOperationMapper;
 import org.apache.flink.connector.kudu.connector.writer.KuduWriterConfig;
 import 
org.apache.flink.connector.kudu.connector.writer.RowDataUpsertOperationMapper;
 import org.apache.flink.connector.kudu.connector.writer.RowOperationMapper;
@@ -50,6 +51,9 @@ import org.apache.flink.types.Row;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.shaded.com.google.common.collect.Lists;
 
 import java.util.*;
@@ -100,8 +104,8 @@ class ReplicationJobExecutor {
 
     // TODO: figure out the mechanism to get this auto populated
     // for now just got the column names hardcoed ~.~
-    private RowOperationMapper getRowOperationMapper() {
-        return new RowOperationMapper(new String[]{"key", "column1_i", 
"column2_i", "column3_s", "column4_b"}, 
AbstractSingleOperationMapper.KuduOperation.UPSERT);
+    private KuduOperationMapper getRowOperationMapper() {
+        return new 
CustomAbstractSingleOperationMapper(CustomAbstractSingleOperationMapper.KuduOperation.UPSERT);
 
     }
 
@@ -116,3 +120,5 @@ class ReplicationJobExecutor {
     }
 
 }
+
+

Reply via email to