This is an automated email from the ASF dual-hosted git repository.

mgreber pushed a commit to branch flink-replication
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit b4b4642e6877f0f6993ea3d9dbde87d4a5344959
Author: Marton Greber <[email protected]>
AuthorDate: Tue Apr 8 14:19:34 2025 +0200

    table init should work now
---
 .../apache/kudu/replication/ReplicationJob.java    |  10 ++
 .../kudu/replication/ReplicationJobConfig.java     |  31 ++--
 .../kudu/replication/ReplicationJobExecutor.java   |   4 +-
 .../replication/ReplicationTableInitializer.java   | 169 +++++++++++++++------
 .../apache/kudu/replication/TestReplication.java   |   5 -
 5 files changed, 153 insertions(+), 66 deletions(-)

diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
index ba41d4ca8..bb574ae34 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
@@ -44,6 +44,16 @@ public class ReplicationJob {
         config.setSinkMasterAddresses(new ArrayList<>(
                 
Collections.singletonList(parameters.get("sinkMasterAddresses", 
"127.0.0.1:8764"))));
         config.setTableName(parameters.get("tableName", "test_table"));
+        if (parameters.has("restoreOwner")) {
+            
config.setRestoreOwner(Boolean.parseBoolean(parameters.get("restoreOwner", 
"false")));
+        }
+        if (parameters.has("tableSuffix")) {
+           config.setTableSuffix(parameters.get("tableSuffix", ""));
+        }
+
+
+        // get all the params that can be use to tune the reader/writer 
configs and pass down to the job executor here.
+        
 
         ReplicationJobExecutor replicationJobExecutor = new 
ReplicationJobExecutor(config);
         replicationJobExecutor.runJob();
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
index ca236babf..ed6fc3578 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
@@ -27,35 +27,26 @@ public class ReplicationJobConfig implements Serializable {
     private List<String> sourceMasterAddresses;
     private List<String> sinkMasterAddresses;
     private String tableName;
+    private Boolean restoreOwner;
+    private String tableSuffix;
+
 
     public List<String> getSourceMasterAddresses() {
         return sourceMasterAddresses;
     }
 
-    /**
-     * @param sourceMasterAddresses
-     */
     public void setSourceMasterAddresses(List<String> sourceMasterAddresses) {
         this.sourceMasterAddresses = sourceMasterAddresses;
     }
 
-    /**
-     * @return List of master addresses in host:port format.
-     */
     public List<String> getSinkMasterAddresses() {
         return sinkMasterAddresses;
     }
 
-    /**
-     * @param sinkMasterAddresses
-     */
     public void setSinkMasterAddresses(List<String> sinkMasterAddresses) {
         this.sinkMasterAddresses = sinkMasterAddresses;
     }
 
-    /**
-     * @return Name of the table the configuration is created for.
-     */
     public String getTableName() {
         return this.tableName;
     }
@@ -63,4 +54,20 @@ public class ReplicationJobConfig implements Serializable {
     public void setTableName(String tableName) {
         this.tableName = tableName;
     }
+
+    public Boolean getRestoreOwner() {
+        return restoreOwner;
+    }
+    public void setRestoreOwner(Boolean restoreOwner) {
+        this.restoreOwner = restoreOwner;
+    }
+    public String getTableSuffix() {
+        return tableSuffix;
+    }
+    public void setTableSuffix(String tableSuffix) {
+        this.tableSuffix = tableSuffix;
+    }
+    public String getSinkTableName() {
+        return tableName + tableSuffix;
+    }
 }
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
index d51f1504b..14af67394 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
@@ -80,7 +80,9 @@ class ReplicationJobExecutor {
      */
     public void runJob() throws Exception {
 
-        ReplicationTableInitializer.createTableIfNotExists(config);
+        try (ReplicationTableInitializer tableInitializer = new 
ReplicationTableInitializer(config)) {
+            tableInitializer.createTableIfNotExists();
+        }
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         KuduSink<Row> kuduSink = new KuduSinkBuilder<Row>()
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java
index 03ed2f523..7cd583c65 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java
@@ -1,76 +1,149 @@
 package org.apache.kudu.replication;
 
+import jdk.nashorn.internal.ir.annotations.Immutable;
 import org.apache.kudu.Schema;
+import org.apache.kudu.client.AlterTableOptions;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Partition;
 import org.apache.kudu.client.PartitionSchema;
+import org.apache.kudu.client.RangePartitionBound;
+import org.apache.kudu.client.RangePartitionWithCustomHashSchema;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class ReplicationTableInitializer {
+public class ReplicationTableInitializer implements AutoCloseable {
+    final private KuduClient sourceClient;
+    final private KuduClient sinkClient;
+    final private String sourceTableName;
+    final private String sinkTableName;
+    final private Boolean restoreOwner;
+    private KuduTable sourceTable;
 
-    static public void createTableIfNotExists(ReplicationJobConfig config) 
throws KuduException {
-        try (KuduClient sourceClient = 
buildClient(config.getSourceMasterAddresses());
-             KuduClient sinkClient = 
buildClient(config.getSinkMasterAddresses())) {
+    public ReplicationTableInitializer(ReplicationJobConfig config) {
+        sourceClient = new KuduClient.KuduClientBuilder(String.join(",", 
config.getSourceMasterAddresses())).build();
+        sinkClient = new KuduClient.KuduClientBuilder(String.join(",", 
config.getSinkMasterAddresses())).build();
+        sourceTableName = config.getTableName();
+        sinkTableName = config.getSinkTableName();
+        restoreOwner = config.getRestoreOwner();
+    }
 
-            String tableName = config.getTableName();
+    public void createTableIfNotExists() throws Exception {
+        if (!sinkClient.tableExists(sinkTableName)) {
+            try {
+                sourceTable = sourceClient.openTable(sourceTableName);
+                createTableRangePartitionByRangePartition();
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to create table " + 
sinkTableName, e);
+            }
+        }
+    }
 
-            if (!sinkClient.tableExists(tableName)) {
-                KuduTable sourceTable = sourceClient.openTable(tableName);
-                Schema schema = sourceTable.getSchema();
-                CreateTableOptions options = extractCreateOptions(sourceTable);
+    private void createTableRangePartitionByRangePartition() throws Exception {
+        CreateTableOptions options = 
getCreateTableOptionsWithoutRangePartitions();
+        List<Partition> boundsWithoutHashSchema = 
sourceTable.getRangePartitions(sourceClient.getDefaultAdminOperationTimeoutMs());
+        List<RangePartitionWithCustomHashSchema> boundsWithCustomHashSchema = 
getRangeBoundsPartialRowsWithHashSchemas();
 
-                sinkClient.createTable(tableName, schema, options);
-            }
+        if (!boundsWithoutHashSchema.isEmpty()) {
+            
options.addRangePartition(boundsWithoutHashSchema.get(0).getDecodedRangeKeyStart(sourceTable),
+                                        
boundsWithoutHashSchema.get(0).getDecodedRangeKeyStart(sourceTable));
+            sinkClient.createTable(sinkTableName, sourceTable.getSchema(), 
options);
+
+            boundsWithoutHashSchema.stream().skip(1).forEach(partition -> {
+                AlterTableOptions alterOptions = new AlterTableOptions();
+                
options.addRangePartition(partition.getDecodedRangeKeyStart(sourceTable),
+                                            
partition.getDecodedRangeKeyStart(sourceTable));
+                try {
+                    sinkClient.alterTable(sinkTableName,alterOptions);
+                } catch (KuduException e) {
+                    throw new RuntimeException("Failed to alter table: " + 
sinkTableName, e);
+                }
+            });
+
+            boundsWithCustomHashSchema.stream().forEach(partition -> {
+                AlterTableOptions alterOptions = new AlterTableOptions();
+                alterOptions.addRangePartition(partition);
+                try {
+                    sinkClient.alterTable(sinkTableName, alterOptions);
+                } catch (KuduException e) {
+                    throw new RuntimeException("Failed to alter table: " + 
sinkTableName, e);
+                }
+            });
+
+
+        } else if (!boundsWithCustomHashSchema.isEmpty()) {
+            options.addRangePartition(boundsWithCustomHashSchema.get(0));
+            sinkClient.createTable(sinkTableName, sourceTable.getSchema(), 
options);
+
+            boundsWithCustomHashSchema.stream().skip(1).forEach(partition -> {
+                AlterTableOptions alterOptions = new AlterTableOptions();
+                alterOptions.addRangePartition(partition);
+                try {
+                    sinkClient.alterTable(sinkTableName, alterOptions);
+                } catch (KuduException e) {
+                    throw new RuntimeException("Failed to alter table: " + 
sinkTableName, e);
+                }
+            });
         }
+
     }
+    private CreateTableOptions getCreateTableOptionsWithoutRangePartitions() {
+        CreateTableOptions options = new CreateTableOptions();
+        if (restoreOwner) {
+            options.setOwner(sourceTable.getOwner());
+        }
+        options.setComment(sourceTable.getComment());
+        options.setNumReplicas(sourceTable.getNumReplicas());
+        options.setExtraConfigs(sourceTable.getExtraConfig());
+        PartitionSchema partitionSchema = sourceTable.getPartitionSchema();
+
+        List<PartitionSchema.HashBucketSchema> hashBucketSchemas = 
partitionSchema.getHashBucketSchemas();
+        for (PartitionSchema.HashBucketSchema hashBucketSchema : 
hashBucketSchemas) {
+            List<String> colNames = 
getColumnNamesFromColumnIds(hashBucketSchema.getColumnIds());
+            options.addHashPartitions(colNames, 
hashBucketSchema.getNumBuckets());
+        }
+
+        PartitionSchema.RangeSchema rangeSchema = 
partitionSchema.getRangeSchema();
+        List<String> colNames = 
getColumnNamesFromColumnIds(rangeSchema.getColumnIds());
+        options.setRangePartitionColumns(colNames);
 
-    static private KuduClient buildClient(List<String> masterAddresses) {
-        return new KuduClient.KuduClientBuilder(String.join(",", 
masterAddresses)).build();
+        return options;
     }
 
-    // TODO write this function!!
-    static private CreateTableOptions extractCreateOptions(KuduTable table) {
-        try {
-            CreateTableOptions createTableOptions = new CreateTableOptions();
-
-            createTableOptions.setComment(table.getComment());
-            createTableOptions.setNumReplicas(table.getNumReplicas());
-            createTableOptions.setExtraConfigs(table.getExtraConfig());
-            PartitionSchema ps = table.getPartitionSchema();
-
-            // Add hash partitions into the mix
-            List<PartitionSchema.HashBucketSchema> hb = 
ps.getHashBucketSchemas();
-            for (PartitionSchema.HashBucketSchema hbSchema : hb) {
-                List<String> colNames = new ArrayList<>();
-                for (int id : hbSchema.getColumnIds()) {
-                    int idx = table.getSchema().getColumnIndex(id);
-                    
colNames.add(table.getSchema().getColumnByIndex(idx).getName());
-                }
-                createTableOptions.addHashPartitions(colNames, 
hbSchema.getNumBuckets());
-            }
+    private List<RangePartitionWithCustomHashSchema> 
getRangeBoundsPartialRowsWithHashSchemas() {
+        List<RangePartitionWithCustomHashSchema> 
rangeBoundsPartialRowsWithHashSchemas = new ArrayList<>();
+        PartitionSchema partitionSchema = sourceTable.getPartitionSchema();
+        List<PartitionSchema.RangeWithHashSchema> rangesWithHashSchemas = 
partitionSchema.getRangesWithHashSchemas();
+        for (PartitionSchema.RangeWithHashSchema rangeWithHashSchema : 
rangesWithHashSchemas) {
+            RangePartitionWithCustomHashSchema partition = new 
RangePartitionWithCustomHashSchema(
+                    rangeWithHashSchema.lowerBound, 
rangeWithHashSchema.upperBound,
+                    RangePartitionBound.INCLUSIVE_BOUND, 
RangePartitionBound.EXCLUSIVE_BOUND);
 
-            PartitionSchema.RangeSchema rangeSchema = ps.getRangeSchema();
-            List<String> colNames = new ArrayList<>();
-            for (int id : rangeSchema.getColumnIds()) {
-                int idx = table.getSchema().getColumnIndex(id);
-                
colNames.add(table.getSchema().getColumnByIndex(idx).getName());
-            }
-            createTableOptions.setRangePartitionColumns(colNames);
-            List<Partition> ranges = table.getRangePartitions(9999);
-            for (Partition partition : ranges) {
-                
createTableOptions.addRangePartition(partition.getDecodedRangeKeyStart(table), 
partition.getDecodedRangeKeyEnd(table));
+            List<PartitionSchema.HashBucketSchema> hashBucketSchemas = 
rangeWithHashSchema.hashSchemas;
+            for (PartitionSchema.HashBucketSchema hashBucketSchema : 
hashBucketSchemas) {
+                List<String> colNames = 
getColumnNamesFromColumnIds(hashBucketSchema.getColumnIds());
+                partition.addHashPartitions(colNames, 
hashBucketSchema.getNumBuckets(), hashBucketSchema.getSeed());
             }
+            rangeBoundsPartialRowsWithHashSchemas.add(partition);
+        }
+        return rangeBoundsPartialRowsWithHashSchemas;
+    }
 
-            return createTableOptions;
-        } catch (RuntimeException | KuduException e) {
-            throw new RuntimeException(e);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+    private List<String> getColumnNamesFromColumnIds(List<Integer> columnIds) {
+        List<String> columnNames = new ArrayList<>();
+        for (int id : columnIds) {
+            int idx = sourceTable.getSchema().getColumnIndex(id);
+            
columnNames.add(sourceTable.getSchema().getColumnByIndex(idx).getName());
         }
+        return columnNames;
+    }
+
+    @Override
+    public void close() throws Exception {
+        sourceClient.close();
+        sinkClient.close();
     }
 }
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
index e5f23477a..9a50f10a9 100644
--- 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
@@ -50,10 +50,6 @@ public class TestReplication {
             fail(e.getMessage());
         }
 
-        // We create the sink table here, temporary workaround to get things 
up and running.
-        // TODO: ideally we should not need to do this.
-//        KuduTable sinkTable = 
createDefaultTable(this.sinkHarness.getClient(), TABLE_NAME);
-
         ReplicationJobConfig config = new ReplicationJobConfig();
 
         
config.setSourceMasterAddresses(Arrays.asList(sourceHarness.getMasterAddressesAsString().split(",")));
@@ -63,7 +59,6 @@ public class TestReplication {
         ReplicationJobExecutor executor = new ReplicationJobExecutor(config);
         executor.runJob();
 
-//        sinkTable = sinkClient.openTable(TABLE_NAME);
         KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
         assertEquals(1000, countRowsInTable(sinkTable));
     }

Reply via email to