This is an automated email from the ASF dual-hosted git repository. mgreber pushed a commit to branch flink-replication in repository https://gitbox.apache.org/repos/asf/kudu.git
commit b4b4642e6877f0f6993ea3d9dbde87d4a5344959 Author: Marton Greber <[email protected]> AuthorDate: Tue Apr 8 14:19:34 2025 +0200 table init should work now --- .../apache/kudu/replication/ReplicationJob.java | 10 ++ .../kudu/replication/ReplicationJobConfig.java | 31 ++-- .../kudu/replication/ReplicationJobExecutor.java | 4 +- .../replication/ReplicationTableInitializer.java | 169 +++++++++++++++------ .../apache/kudu/replication/TestReplication.java | 5 - 5 files changed, 153 insertions(+), 66 deletions(-) diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java index ba41d4ca8..bb574ae34 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java @@ -44,6 +44,16 @@ public class ReplicationJob { config.setSinkMasterAddresses(new ArrayList<>( Collections.singletonList(parameters.get("sinkMasterAddresses", "127.0.0.1:8764")))); config.setTableName(parameters.get("tableName", "test_table")); + if (parameters.has("restoreOwner")) { + config.setRestoreOwner(Boolean.parseBoolean(parameters.get("restoreOwner", "false"))); + } + if (parameters.has("tableSuffix")) { + config.setTableSuffix(parameters.get("tableSuffix", "")); + } + + + // get all the params that can be use to tune the reader/writer configs and pass down to the job executor here. + ReplicationJobExecutor replicationJobExecutor = new ReplicationJobExecutor(config); replicationJobExecutor.runJob(); diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java index ca236babf..ed6fc3578 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java @@ -27,35 +27,26 @@ public class ReplicationJobConfig implements Serializable { private List<String> sourceMasterAddresses; private List<String> sinkMasterAddresses; private String tableName; + private Boolean restoreOwner; + private String tableSuffix; + public List<String> getSourceMasterAddresses() { return sourceMasterAddresses; } - /** - * @param sourceMasterAddresses - */ public void setSourceMasterAddresses(List<String> sourceMasterAddresses) { this.sourceMasterAddresses = sourceMasterAddresses; } - /** - * @return List of master addresses in host:port format. - */ public List<String> getSinkMasterAddresses() { return sinkMasterAddresses; } - /** - * @param sinkMasterAddresses - */ public void setSinkMasterAddresses(List<String> sinkMasterAddresses) { this.sinkMasterAddresses = sinkMasterAddresses; } - /** - * @return Name of the table the configuration is created for. - */ public String getTableName() { return this.tableName; } @@ -63,4 +54,20 @@ public class ReplicationJobConfig implements Serializable { public void setTableName(String tableName) { this.tableName = tableName; } + + public Boolean getRestoreOwner() { + return restoreOwner; + } + public void setRestoreOwner(Boolean restoreOwner) { + this.restoreOwner = restoreOwner; + } + public String getTableSuffix() { + return tableSuffix; + } + public void setTableSuffix(String tableSuffix) { + this.tableSuffix = tableSuffix; + } + public String getSinkTableName() { + return tableName + tableSuffix; + } } diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java index d51f1504b..14af67394 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java @@ -80,7 +80,9 @@ class ReplicationJobExecutor { */ public void runJob() throws Exception { - ReplicationTableInitializer.createTableIfNotExists(config); + try (ReplicationTableInitializer tableInitializer = new ReplicationTableInitializer(config)) { + tableInitializer.createTableIfNotExists(); + } StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KuduSink<Row> kuduSink = new KuduSinkBuilder<Row>() diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java index 03ed2f523..7cd583c65 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java @@ -1,76 +1,149 @@ package org.apache.kudu.replication; +import jdk.nashorn.internal.ir.annotations.Immutable; import org.apache.kudu.Schema; +import org.apache.kudu.client.AlterTableOptions; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Partition; import org.apache.kudu.client.PartitionSchema; +import org.apache.kudu.client.RangePartitionBound; +import org.apache.kudu.client.RangePartitionWithCustomHashSchema; import java.util.ArrayList; import java.util.List; -public class ReplicationTableInitializer { +public class ReplicationTableInitializer implements AutoCloseable { + final private KuduClient sourceClient; + final private KuduClient sinkClient; + final private String sourceTableName; + final private String sinkTableName; + final private Boolean restoreOwner; + private KuduTable sourceTable; - static public void createTableIfNotExists(ReplicationJobConfig config) throws KuduException { - try (KuduClient sourceClient = buildClient(config.getSourceMasterAddresses()); - KuduClient sinkClient = buildClient(config.getSinkMasterAddresses())) { + public ReplicationTableInitializer(ReplicationJobConfig config) { + sourceClient = new KuduClient.KuduClientBuilder(String.join(",", config.getSourceMasterAddresses())).build(); + sinkClient = new KuduClient.KuduClientBuilder(String.join(",", config.getSinkMasterAddresses())).build(); + sourceTableName = config.getTableName(); + sinkTableName = config.getSinkTableName(); + restoreOwner = config.getRestoreOwner(); + } - String tableName = config.getTableName(); + public void createTableIfNotExists() throws Exception { + if (!sinkClient.tableExists(sinkTableName)) { + try { + sourceTable = sourceClient.openTable(sourceTableName); + createTableRangePartitionByRangePartition(); + } catch (Exception e) { + throw new RuntimeException("Failed to create table " + sinkTableName, e); + } + } + } - if (!sinkClient.tableExists(tableName)) { - KuduTable sourceTable = sourceClient.openTable(tableName); - Schema schema = sourceTable.getSchema(); - CreateTableOptions options = extractCreateOptions(sourceTable); + private void createTableRangePartitionByRangePartition() throws Exception { + CreateTableOptions options = getCreateTableOptionsWithoutRangePartitions(); + List<Partition> boundsWithoutHashSchema = sourceTable.getRangePartitions(sourceClient.getDefaultAdminOperationTimeoutMs()); + List<RangePartitionWithCustomHashSchema> boundsWithCustomHashSchema = getRangeBoundsPartialRowsWithHashSchemas(); - sinkClient.createTable(tableName, schema, options); - } + if (!boundsWithoutHashSchema.isEmpty()) { + options.addRangePartition(boundsWithoutHashSchema.get(0).getDecodedRangeKeyStart(sourceTable), + boundsWithoutHashSchema.get(0).getDecodedRangeKeyStart(sourceTable)); + sinkClient.createTable(sinkTableName, sourceTable.getSchema(), options); + + boundsWithoutHashSchema.stream().skip(1).forEach(partition -> { + AlterTableOptions alterOptions = new AlterTableOptions(); + options.addRangePartition(partition.getDecodedRangeKeyStart(sourceTable), + partition.getDecodedRangeKeyStart(sourceTable)); + try { + sinkClient.alterTable(sinkTableName,alterOptions); + } catch (KuduException e) { + throw new RuntimeException("Failed to alter table: " + sinkTableName, e); + } + }); + + boundsWithCustomHashSchema.stream().forEach(partition -> { + AlterTableOptions alterOptions = new AlterTableOptions(); + alterOptions.addRangePartition(partition); + try { + sinkClient.alterTable(sinkTableName, alterOptions); + } catch (KuduException e) { + throw new RuntimeException("Failed to alter table: " + sinkTableName, e); + } + }); + + + } else if (!boundsWithCustomHashSchema.isEmpty()) { + options.addRangePartition(boundsWithCustomHashSchema.get(0)); + sinkClient.createTable(sinkTableName, sourceTable.getSchema(), options); + + boundsWithCustomHashSchema.stream().skip(1).forEach(partition -> { + AlterTableOptions alterOptions = new AlterTableOptions(); + alterOptions.addRangePartition(partition); + try { + sinkClient.alterTable(sinkTableName, alterOptions); + } catch (KuduException e) { + throw new RuntimeException("Failed to alter table: " + sinkTableName, e); + } + }); } + } + private CreateTableOptions getCreateTableOptionsWithoutRangePartitions() { + CreateTableOptions options = new CreateTableOptions(); + if (restoreOwner) { + options.setOwner(sourceTable.getOwner()); + } + options.setComment(sourceTable.getComment()); + options.setNumReplicas(sourceTable.getNumReplicas()); + options.setExtraConfigs(sourceTable.getExtraConfig()); + PartitionSchema partitionSchema = sourceTable.getPartitionSchema(); + + List<PartitionSchema.HashBucketSchema> hashBucketSchemas = partitionSchema.getHashBucketSchemas(); + for (PartitionSchema.HashBucketSchema hashBucketSchema : hashBucketSchemas) { + List<String> colNames = getColumnNamesFromColumnIds(hashBucketSchema.getColumnIds()); + options.addHashPartitions(colNames, hashBucketSchema.getNumBuckets()); + } + + PartitionSchema.RangeSchema rangeSchema = partitionSchema.getRangeSchema(); + List<String> colNames = getColumnNamesFromColumnIds(rangeSchema.getColumnIds()); + options.setRangePartitionColumns(colNames); - static private KuduClient buildClient(List<String> masterAddresses) { - return new KuduClient.KuduClientBuilder(String.join(",", masterAddresses)).build(); + return options; } - // TODO write this function!! - static private CreateTableOptions extractCreateOptions(KuduTable table) { - try { - CreateTableOptions createTableOptions = new CreateTableOptions(); - - createTableOptions.setComment(table.getComment()); - createTableOptions.setNumReplicas(table.getNumReplicas()); - createTableOptions.setExtraConfigs(table.getExtraConfig()); - PartitionSchema ps = table.getPartitionSchema(); - - // Add hash partitions into the mix - List<PartitionSchema.HashBucketSchema> hb = ps.getHashBucketSchemas(); - for (PartitionSchema.HashBucketSchema hbSchema : hb) { - List<String> colNames = new ArrayList<>(); - for (int id : hbSchema.getColumnIds()) { - int idx = table.getSchema().getColumnIndex(id); - colNames.add(table.getSchema().getColumnByIndex(idx).getName()); - } - createTableOptions.addHashPartitions(colNames, hbSchema.getNumBuckets()); - } + private List<RangePartitionWithCustomHashSchema> getRangeBoundsPartialRowsWithHashSchemas() { + List<RangePartitionWithCustomHashSchema> rangeBoundsPartialRowsWithHashSchemas = new ArrayList<>(); + PartitionSchema partitionSchema = sourceTable.getPartitionSchema(); + List<PartitionSchema.RangeWithHashSchema> rangesWithHashSchemas = partitionSchema.getRangesWithHashSchemas(); + for (PartitionSchema.RangeWithHashSchema rangeWithHashSchema : rangesWithHashSchemas) { + RangePartitionWithCustomHashSchema partition = new RangePartitionWithCustomHashSchema( + rangeWithHashSchema.lowerBound, rangeWithHashSchema.upperBound, + RangePartitionBound.INCLUSIVE_BOUND, RangePartitionBound.EXCLUSIVE_BOUND); - PartitionSchema.RangeSchema rangeSchema = ps.getRangeSchema(); - List<String> colNames = new ArrayList<>(); - for (int id : rangeSchema.getColumnIds()) { - int idx = table.getSchema().getColumnIndex(id); - colNames.add(table.getSchema().getColumnByIndex(idx).getName()); - } - createTableOptions.setRangePartitionColumns(colNames); - List<Partition> ranges = table.getRangePartitions(9999); - for (Partition partition : ranges) { - createTableOptions.addRangePartition(partition.getDecodedRangeKeyStart(table), partition.getDecodedRangeKeyEnd(table)); + List<PartitionSchema.HashBucketSchema> hashBucketSchemas = rangeWithHashSchema.hashSchemas; + for (PartitionSchema.HashBucketSchema hashBucketSchema : hashBucketSchemas) { + List<String> colNames = getColumnNamesFromColumnIds(hashBucketSchema.getColumnIds()); + partition.addHashPartitions(colNames, hashBucketSchema.getNumBuckets(), hashBucketSchema.getSeed()); } + rangeBoundsPartialRowsWithHashSchemas.add(partition); + } + return rangeBoundsPartialRowsWithHashSchemas; + } - return createTableOptions; - } catch (RuntimeException | KuduException e) { - throw new RuntimeException(e); - } catch (Exception e) { - throw new RuntimeException(e); + private List<String> getColumnNamesFromColumnIds(List<Integer> columnIds) { + List<String> columnNames = new ArrayList<>(); + for (int id : columnIds) { + int idx = sourceTable.getSchema().getColumnIndex(id); + columnNames.add(sourceTable.getSchema().getColumnByIndex(idx).getName()); } + return columnNames; + } + + @Override + public void close() throws Exception { + sourceClient.close(); + sinkClient.close(); } } diff --git a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java index e5f23477a..9a50f10a9 100644 --- a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java +++ b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java @@ -50,10 +50,6 @@ public class TestReplication { fail(e.getMessage()); } - // We create the sink table here, temporary workaround to get things up and running. - // TODO: ideally we should not need to do this. -// KuduTable sinkTable = createDefaultTable(this.sinkHarness.getClient(), TABLE_NAME); - ReplicationJobConfig config = new ReplicationJobConfig(); config.setSourceMasterAddresses(Arrays.asList(sourceHarness.getMasterAddressesAsString().split(","))); @@ -63,7 +59,6 @@ public class TestReplication { ReplicationJobExecutor executor = new ReplicationJobExecutor(config); executor.runJob(); -// sinkTable = sinkClient.openTable(TABLE_NAME); KuduTable sinkTable = sinkClient.openTable(TABLE_NAME); assertEquals(1000, countRowsInTable(sinkTable)); }
