This is an automated email from the ASF dual-hosted git repository.

mgreber pushed a commit to branch flink-replication
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 4cf118a9f5fc738efe02ccc6d53af64f69fea61c
Author: Marton Greber <[email protected]>
AuthorDate: Wed Mar 26 22:53:25 2025 +0100

    asdf
---
 .../CustomAbstractSingleOperationMapper.java       | 65 ----------------------
 .../kudu/replication/CustomColumnSchemas.java      | 29 ++++++++++
 .../kudu/replication/CustomCreateTableOptions.java | 63 +++++++++++++++++++++
 .../CustomReplicationOperationMapper.java          | 42 ++++++++++++++
 .../CustomReplicationRowRestultConverter.java      | 28 ++++++++++
 .../kudu/replication/ReplicationJobConfig.java     |  3 +-
 .../kudu/replication/ReplicationJobExecutor.java   | 23 +++++---
 .../replication/ReplicationTableInitializer.java   |  4 ++
 .../apache/kudu/replication/TestReplication.java   |  7 ++-
 9 files changed, 187 insertions(+), 77 deletions(-)

diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java
deleted file mode 100644
index 01681b0de..000000000
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package org.apache.kudu.replication;
-
-import org.apache.flink.connector.kudu.connector.writer.KuduOperationMapper;
-import org.apache.flink.types.Row;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
-public class CustomAbstractSingleOperationMapper implements 
KuduOperationMapper<Row> {
-
-    private final KuduOperation operation;
-
-
-    public CustomAbstractSingleOperationMapper(KuduOperation operation) {
-        this.operation = operation;
-    }
-
-    public Optional<Operation> createBaseOperation(Row input, KuduTable table) 
{
-        if (operation == null) {
-            throw new UnsupportedOperationException(
-                    "createBaseOperation must be overridden if no operation 
specified in constructor");
-        }
-        switch (operation) {
-            case INSERT:
-                return Optional.of(table.newInsert());
-            case UPDATE:
-                return Optional.of(table.newUpdate());
-            case UPSERT:
-                return Optional.of(table.newUpsert());
-            case DELETE:
-                return Optional.of(table.newDelete());
-            default:
-                throw new RuntimeException("Unknown operation " + operation);
-        }
-    }
-
-    @Override
-    public List<Operation> createOperations(Row input, KuduTable table) {
-        Optional<Operation> operationOpt = createBaseOperation(input, table);
-        if (!operationOpt.isPresent()) {
-            return Collections.emptyList();
-        }
-
-        Operation operation = operationOpt.get();
-        PartialRow partialRow = operation.getRow();
-
-        for (int i = 0; i < input.getArity(); i++) {
-            partialRow.addObject(i, input.getField(i));
-        }
-
-        return Collections.singletonList(operation);
-    }
-
-    /** Kudu operation types. */
-    public enum KuduOperation {
-        INSERT,
-        UPDATE,
-        UPSERT,
-        DELETE
-    }
-}
\ No newline at end of file
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomColumnSchemas.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomColumnSchemas.java
new file mode 100644
index 000000000..29f36a714
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomColumnSchemas.java
@@ -0,0 +1,29 @@
+package org.apache.kudu.replication;
+
+import org.apache.flink.connector.kudu.connector.ColumnSchemasFactory;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduTable;
+
+import java.util.List;
+
+public class CustomColumnSchemas implements ColumnSchemasFactory {
+    private final ReplicationJobConfig config;
+
+    public CustomColumnSchemas(ReplicationJobConfig config) {
+        this.config = config;
+    }
+
+    @Override
+    public List<ColumnSchema> getColumnSchemas() {
+        try {
+            KuduClient sourceClient = new 
KuduClient.KuduClientBuilder(String.join(",", 
config.getSourceMasterAddresses())).build();
+            KuduTable table = sourceClient.openTable(config.getTableName());
+            sourceClient.close();
+            return table.getSchema().getColumns();
+        } catch (KuduException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomCreateTableOptions.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomCreateTableOptions.java
new file mode 100644
index 000000000..81d81f5b6
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomCreateTableOptions.java
@@ -0,0 +1,63 @@
+package org.apache.kudu.replication;
+
+import org.apache.flink.connector.kudu.connector.CreateTableOptionsFactory;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Partition;
+import org.apache.kudu.client.PartitionSchema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CustomCreateTableOptions implements CreateTableOptionsFactory {
+    private final ReplicationJobConfig config;
+
+    public CustomCreateTableOptions(ReplicationJobConfig config) {
+        this.config = config;
+    }
+    @Override
+    public org.apache.kudu.client.CreateTableOptions getCreateTableOptions() {
+        try {
+            KuduClient sourceClient = new 
KuduClient.KuduClientBuilder(String.join(",", 
config.getSourceMasterAddresses())).build();
+            KuduTable table = sourceClient.openTable(config.getTableName());
+            CreateTableOptions createTableOptions = new CreateTableOptions();
+
+            createTableOptions.setComment(table.getComment());
+            createTableOptions.setNumReplicas(table.getNumReplicas());
+            createTableOptions.setExtraConfigs(table.getExtraConfig());
+            PartitionSchema ps = table.getPartitionSchema();
+
+            // Add hash partitions into the mix
+            List<PartitionSchema.HashBucketSchema> hb = 
ps.getHashBucketSchemas();
+            for (PartitionSchema.HashBucketSchema hbSchema : hb) {
+                List<String> colNames = new ArrayList<>();
+                for (int id : hbSchema.getColumnIds()) {
+                    int idx = table.getSchema().getColumnIndex(id);
+                    
colNames.add(table.getSchema().getColumnByIndex(idx).getName());
+                }
+                createTableOptions.addHashPartitions(colNames, 
hbSchema.getNumBuckets());
+            }
+
+            PartitionSchema.RangeSchema rangeSchema = ps.getRangeSchema();
+            List<String> colNames = new ArrayList<>();
+            for (int id : rangeSchema.getColumnIds()) {
+                int idx = table.getSchema().getColumnIndex(id);
+                
colNames.add(table.getSchema().getColumnByIndex(idx).getName());
+            }
+            createTableOptions.setRangePartitionColumns(colNames);
+            List<Partition> ranges = table.getRangePartitions(9999);
+            for (Partition partition : ranges) {
+                
createTableOptions.addRangePartition(partition.getDecodedRangeKeyStart(table), 
partition.getDecodedRangeKeyEnd(table));
+            }
+
+            sourceClient.close();
+            return createTableOptions;
+        } catch (RuntimeException | KuduException e) {
+            throw new RuntimeException(e);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java
new file mode 100644
index 000000000..466b7bcc3
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java
@@ -0,0 +1,42 @@
+package org.apache.kudu.replication;
+
+import org.apache.flink.connector.kudu.connector.writer.KuduOperationMapper;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CustomReplicationOperationMapper implements 
KuduOperationMapper<Row> {
+
+    public Optional<Operation> createBaseOperation(Row input, KuduTable table) 
{
+        RowKind kind = input.getKind();
+        if (kind == RowKind.DELETE) {
+            return Optional.of(table.newDeleteIgnore());
+        } else {
+            return Optional.of(table.newUpsertIgnore());
+        }
+    }
+
+    @Override
+    public List<Operation> createOperations(Row input, KuduTable table) {
+        Optional<Operation> operationOpt = createBaseOperation(input, table);
+        if (!operationOpt.isPresent()) {
+            return Collections.emptyList();
+        }
+
+        Operation operation = operationOpt.get();
+        PartialRow partialRow = operation.getRow();
+
+        for (int i = 0; i < input.getArity(); i++) {
+            partialRow.addObject(i, input.getField(i));
+        }
+
+        return Collections.singletonList(operation);
+    }
+
+}
\ No newline at end of file
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java
new file mode 100644
index 000000000..16f08ef4e
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java
@@ -0,0 +1,28 @@
+package org.apache.kudu.replication;
+
+import org.apache.flink.connector.kudu.connector.converter.RowResultConverter;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.RowResult;
+
+public class CustomReplicationRowRestultConverter implements 
RowResultConverter<Row> {
+    public CustomReplicationRowRestultConverter() {
+    }
+
+    public Row convert(RowResult row) {
+        Schema schema = row.getColumnProjection();
+        Row values = new Row(schema.getColumnCount());
+        if (schema.hasIsDeleted()) {
+            values.setKind((RowKind.DELETE));
+        }
+        schema.getColumns().forEach((column) -> {
+            String name = column.getName();
+            int pos = schema.getColumnIndex(name);
+            if (!row.isNull(name)) {
+                values.setField(pos, row.getObject(name));
+            }
+        });
+        return values;
+    }
+}
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
index 5f46b79c0..ca236babf 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
@@ -17,12 +17,13 @@
 
 package org.apache.kudu.replication;
 
+import java.io.Serializable;
 import java.util.List;
 
 /**
  * A configuration object for ReplicationJobs used for the Kudu Flink based 
replication.
  */
-public class ReplicationJobConfig {
+public class ReplicationJobConfig implements Serializable {
     private List<String> sourceMasterAddresses;
     private List<String> sinkMasterAddresses;
     private String tableName;
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
index d9d3c7a2f..9d1806c4f 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.connector.kudu.connector.ColumnSchemasFactory;
+import org.apache.flink.connector.kudu.connector.CreateTableOptionsFactory;
 import org.apache.flink.connector.kudu.connector.KuduTableInfo;
 import 
org.apache.flink.connector.kudu.connector.converter.RowResultRowConverter;
 import org.apache.flink.connector.kudu.connector.reader.KuduInputSplit;
@@ -49,11 +50,16 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Partition;
+import org.apache.kudu.client.PartitionSchema;
 import org.apache.kudu.shaded.com.google.common.collect.Lists;
 
 import java.util.*;
@@ -73,12 +79,19 @@ class ReplicationJobExecutor {
      * Executes the actual business logic of the replication.
      */
     public void runJob() throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
+        KuduClient sinkClient = new 
KuduClient.KuduClientBuilder(String.join(",", 
config.getSinkMasterAddresses())).build();
+        KuduClient sourceClient = new 
KuduClient.KuduClientBuilder(String.join(",", 
config.getSourceMasterAddresses())).build();
+        if (!sinkClient.tableExists(config.getTableName())) {
+            Schema schema = 
sourceClient.openTable(config.getTableName()).getSchema();
+            sinkClient.createTable(config.getTableName(), schema, new 
CustomCreateTableOptions(config).getCreateTableOptions());
+        }
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         KuduSink<Row> kuduSink = new KuduSinkBuilder<Row>()
                 .setWriterConfig(getWriterConfig())
                 .setTableInfo(getTableInfo())
-                .setOperationMapper(getRowOperationMapper())
+                .setOperationMapper(new CustomReplicationOperationMapper())
                 .build();
 
         KuduSource<Row> kuduSource = new KuduSourceBuilder<Row>()
@@ -102,12 +115,6 @@ class ReplicationJobExecutor {
             .build();
     }
 
-    // TODO: figure out the mechanism to get this auto populated
-    // for now just got the column names hardcoed ~.~
-    private KuduOperationMapper getRowOperationMapper() {
-        return new 
CustomAbstractSingleOperationMapper(CustomAbstractSingleOperationMapper.KuduOperation.UPSERT);
-
-    }
 
     private KuduReaderConfig getReaderConfig() {
         return KuduReaderConfig.Builder
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java
new file mode 100644
index 000000000..525f2dbd2
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java
@@ -0,0 +1,4 @@
+package org.apache.kudu.replication;
+
+public class ReplicationTableInitializer {
+}
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
index 16f1364dd..e5f23477a 100644
--- 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
@@ -43,7 +43,7 @@ public class TestReplication {
         //Setup source table
         try {
             createTableWithOneThousandRows(
-                    this.sourceHarness.getAsyncClient(), TABLE_NAME, 32 * 
1024, DEFAULT_SLEEP);
+                    this.sourceHarness.getAsyncClient(), TABLE_NAME, 10, 
DEFAULT_SLEEP);
         } catch (Exception e) {
             e.printStackTrace();
             LOG.error(e.getMessage());
@@ -52,7 +52,7 @@ public class TestReplication {
 
         // We create the sink table here, temporary workaround to get things 
up and running.
         // TODO: ideally we should not need to do this.
-        KuduTable sinkTable = createDefaultTable(this.sinkHarness.getClient(), 
TABLE_NAME);
+//        KuduTable sinkTable = 
createDefaultTable(this.sinkHarness.getClient(), TABLE_NAME);
 
         ReplicationJobConfig config = new ReplicationJobConfig();
 
@@ -63,7 +63,8 @@ public class TestReplication {
         ReplicationJobExecutor executor = new ReplicationJobExecutor(config);
         executor.runJob();
 
-        sinkTable = sinkClient.openTable(TABLE_NAME);
+//        sinkTable = sinkClient.openTable(TABLE_NAME);
+        KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
         assertEquals(1000, countRowsInTable(sinkTable));
     }
 

Reply via email to