This is an automated email from the ASF dual-hosted git repository.
abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new fac331784 KUDU-3662 [6/n] Add sink table initialization
fac331784 is described below
commit fac331784f535d80fc304940e50bf0d94b4117e1
Author: Marton Greber <[email protected]>
AuthorDate: Tue Aug 19 17:42:05 2025 +0200
KUDU-3662 [6/n] Add sink table initialization
Adds automatic table creation to eliminate manual sink table setup.
The createTable flag in ReplicationJobConfig enables automatic
recreation of source table schema, partitioning, and metadata on
the destination cluster.
ReplicationTableInitializer handles all Kudu partitioning schemes
including hash-only, range-only, combined hash/range, and custom
hash schemas per range. Implementation ports logic from Scala
backup/restore code in KuduRestore for proven reliability.
Add unit tests to verify table initialization functionality across
all supported partitioning types.
Change-Id: I1002b4ba272c1acaab351e3ff3f341ca327070d2
Reviewed-on: http://gerrit.cloudera.org:8080/23451
Reviewed-by: Zoltan Chovan <[email protected]>
Reviewed-by: Attila Bukor <[email protected]>
Tested-by: Attila Bukor <[email protected]>
---
.../kudu/replication/ReplicationConfigParser.java | 7 +
.../kudu/replication/ReplicationEnvProvider.java | 7 +
.../kudu/replication/ReplicationJobConfig.java | 18 +-
.../replication/ReplicationTableInitializer.java | 214 ++++++++++++
.../TestReplicationTableInitializer.java | 368 +++++++++++++++++++++
5 files changed, 612 insertions(+), 2 deletions(-)
diff --git
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationConfigParser.java
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationConfigParser.java
index b68974da4..829b898db 100644
---
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationConfigParser.java
+++
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationConfigParser.java
@@ -51,6 +51,9 @@ public class ReplicationConfigParser {
* (default: empty string)</li>
* <li>{@code job.discoveryIntervalSeconds} (optional) – interval in
seconds at which the source
* tries to perform a diff scan to get new or changed data (default:
300)</li>
+ * <li>{@code job.createTable} (optional) – whether to create the sink
table if it not exists.
+ * This setting recreates the partition schema that is present on the
source side.
+ * (default: false)</li>
* </ul>
*
* @param params the Flink {@code ParameterTool} containing command-line
parameters
@@ -75,6 +78,10 @@ public class ReplicationConfigParser {
builder.setDiscoveryIntervalSeconds(params.getInt("job.discoveryIntervalSeconds"));
}
+ if (params.has("job.createTable")) {
+ builder.setCreateTable(params.getBoolean("job.createTable"));
+ }
+
return builder.build();
}
diff --git
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
index 672131303..923cdc9cb 100644
---
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
+++
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
@@ -58,6 +58,13 @@ public class ReplicationEnvProvider {
* @throws Exception if table initialization or environment setup fails
*/
public StreamExecutionEnvironment getEnv() throws Exception {
+ if (jobConfig.getCreateTable()) {
+ try (ReplicationTableInitializer tableInitializer =
+ new ReplicationTableInitializer(jobConfig)) {
+ tableInitializer.createTableIfNotExists();
+ }
+ }
+
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
KuduSource<Row> kuduSource = KuduSource.<Row>builder()
diff --git
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
index f65ea20a5..7b785554e 100644
---
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
+++
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
@@ -37,6 +37,7 @@ public class ReplicationJobConfig implements Serializable {
private final boolean restoreOwner;
private final String tableSuffix;
private final long discoveryIntervalSeconds;
+ private final boolean createTable;
private ReplicationJobConfig(
String sourceMasterAddresses,
@@ -44,7 +45,8 @@ public class ReplicationJobConfig implements Serializable {
String tableName,
boolean restoreOwner,
String tableSuffix,
- long discoveryIntervalSeconds) {
+ long discoveryIntervalSeconds,
+ boolean createTable) {
this.sourceMasterAddresses = checkNotNull(sourceMasterAddresses,
"sourceMasterAddresses cannot be null");
this.sinkMasterAddresses = checkNotNull(sinkMasterAddresses,
@@ -53,6 +55,7 @@ public class ReplicationJobConfig implements Serializable {
this.restoreOwner = restoreOwner;
this.tableSuffix = tableSuffix != null ? tableSuffix : "";
this.discoveryIntervalSeconds = discoveryIntervalSeconds;
+ this.createTable = createTable;
}
public String getSourceMasterAddresses() {
@@ -79,6 +82,10 @@ public class ReplicationJobConfig implements Serializable {
return discoveryIntervalSeconds;
}
+ public boolean getCreateTable() {
+ return createTable;
+ }
+
public String getSinkTableName() {
return tableName + tableSuffix;
}
@@ -98,6 +105,7 @@ public class ReplicationJobConfig implements Serializable {
private String tableSuffix = "";
// The default discover interval is 5 minutes.
private long discoveryIntervalSeconds = 5 * 60;
+ private boolean createTable = false;
public Builder setSourceMasterAddresses(String sourceMasterAddresses) {
this.sourceMasterAddresses = sourceMasterAddresses;
@@ -129,6 +137,11 @@ public class ReplicationJobConfig implements Serializable {
return this;
}
+ public Builder setCreateTable(boolean createTable) {
+ this.createTable = createTable;
+ return this;
+ }
+
public ReplicationJobConfig build() {
return new ReplicationJobConfig(
sourceMasterAddresses,
@@ -136,7 +149,8 @@ public class ReplicationJobConfig implements Serializable {
tableName,
restoreOwner,
tableSuffix,
- discoveryIntervalSeconds);
+ discoveryIntervalSeconds,
+ createTable);
}
}
diff --git
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java
new file mode 100644
index 000000000..f99bc8f5f
--- /dev/null
+++
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kudu.client.AlterTableOptions;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Partition;
+import org.apache.kudu.client.PartitionSchema;
+import org.apache.kudu.client.RangePartitionBound;
+import org.apache.kudu.client.RangePartitionWithCustomHashSchema;
+
+/**
+ * Helper class that initializes the sink-side table for replication.
+ * If the table does not exist on the sink, it creates an identical table
+ * with the same schema, table parameters, and partitioning scheme as the
source.
+ *
+ * <p>This implementation is based on the Scala backup/restore logic from the
+ * kudu-backup module, specifically the
+ * KuduRestore.createTableRangePartitionByRangePartition method.
+ *
+ * <p>The partition recreation logic closely follows the patterns established
+ * in the Kudu backup/restore functionality to ensure correct handling of:
+ * - Table-wide hash schemas
+ * - Range partitions with custom hash schemas
+ * - Mixed partition scenarios
+ */
+public class ReplicationTableInitializer implements AutoCloseable {
+ private final KuduClient sourceClient;
+ private final KuduClient sinkClient;
+ private final ReplicationJobConfig config;
+ private KuduTable sourceTable;
+
+ public ReplicationTableInitializer(ReplicationJobConfig config) {
+ sourceClient = new KuduClient.KuduClientBuilder(
+ String.join(",", config.getSourceMasterAddresses())).build();
+ sinkClient = new KuduClient.KuduClientBuilder(
+ String.join(",", config.getSinkMasterAddresses())).build();
+ this.config = config;
+ }
+
+ public void createTableIfNotExists() throws Exception {
+ if (!sinkClient.tableExists(config.getSinkTableName())) {
+ try {
+ sourceTable = sourceClient.openTable(config.getTableName());
+ createTableRangePartitionByRangePartition();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create table " +
config.getSinkTableName(), e);
+ }
+ }
+ }
+
+ /**
+ * Creates table with range partitions, handling both table-wide and custom
hash schemas.
+ *
+ * <p>This method is a Java port of the Scala method with the same name from
+ * the kudu-backup module in
KuduRestore.createTableRangePartitionByRangePartition.
+ *
+ * <p>The logic follows the same pattern:
+ * 1. Get ranges with table-wide hash schema vs custom hash schema separately
+ * 2. Create table with first range (if any)
+ * 3. Add remaining ranges via ALTER TABLE operations
+ */
+ private void createTableRangePartitionByRangePartition() throws Exception {
+ CreateTableOptions options = getCreateTableOptionsWithoutRangePartitions();
+ List<Partition> rangePartitionsWithTableHashSchema =
+ sourceTable.getRangePartitionsWithTableHashSchema(
+ sourceClient.getDefaultAdminOperationTimeoutMs());
+ List<RangePartitionWithCustomHashSchema> boundsWithCustomHashSchema =
+ getRangeBoundsPartialRowsWithHashSchemas();
+
+ if (!rangePartitionsWithTableHashSchema.isEmpty()) {
+ // Adds the first range partition with table wide hash schema through
create.
+ options.addRangePartition(
+
rangePartitionsWithTableHashSchema.get(0).getDecodedRangeKeyStart(sourceTable),
+
rangePartitionsWithTableHashSchema.get(0).getDecodedRangeKeyEnd(sourceTable));
+ sinkClient.createTable(config.getSinkTableName(),
sourceTable.getSchema(), options);
+
+ // Add the rest of the range partitions with table wide hash schema
through alters.
+ rangePartitionsWithTableHashSchema.stream().skip(1).forEach(partition ->
{
+ AlterTableOptions alterOptions = new AlterTableOptions();
+
alterOptions.addRangePartition(partition.getDecodedRangeKeyStart(sourceTable),
+ partition.getDecodedRangeKeyEnd(sourceTable));
+ try {
+ sinkClient.alterTable(config.getSinkTableName(), alterOptions);
+ } catch (KuduException e) {
+ throw new RuntimeException("Failed to alter table: " +
config.getSinkTableName(), e);
+ }
+ });
+
+ // adds range partitions with custom hash schema through alters.
+ boundsWithCustomHashSchema.stream().forEach(partition -> {
+ AlterTableOptions alterOptions = new AlterTableOptions();
+ alterOptions.addRangePartition(partition);
+ try {
+ sinkClient.alterTable(config.getSinkTableName(), alterOptions);
+ } catch (KuduException e) {
+ throw new RuntimeException("Failed to alter table: " +
config.getSinkTableName(), e);
+ }
+ });
+
+
+ } else if (!boundsWithCustomHashSchema.isEmpty()) {
+ // Adds first range partition with custom hash schema through create.
+ options.addRangePartition(boundsWithCustomHashSchema.get(0));
+ sinkClient.createTable(config.getSinkTableName(),
sourceTable.getSchema(), options);
+
+ // Adds rest of range partitions with custom hash schema through alters.
+ boundsWithCustomHashSchema.stream().skip(1).forEach(partition -> {
+ AlterTableOptions alterOptions = new AlterTableOptions();
+ alterOptions.addRangePartition(partition);
+ try {
+ sinkClient.alterTable(config.getSinkTableName(), alterOptions);
+ } catch (KuduException e) {
+ throw new RuntimeException("Failed to alter table: " +
config.getSinkTableName(), e);
+ }
+ });
+ }
+
+ }
+
+ /**
+ * Creates base table options including table-wide hash schema but without
range partitions.
+ *
+ * <p>This implementation corresponds to the Scala method
+ * TableMetadata.getCreateTableOptionsWithoutRangePartitions from the
kudu-backup-common module.
+ */
+ private CreateTableOptions getCreateTableOptionsWithoutRangePartitions() {
+ CreateTableOptions options = new CreateTableOptions();
+ if (config.getRestoreOwner()) {
+ options.setOwner(sourceTable.getOwner());
+ }
+ options.setComment(sourceTable.getComment());
+ options.setNumReplicas(sourceTable.getNumReplicas());
+ options.setExtraConfigs(sourceTable.getExtraConfig());
+ PartitionSchema partitionSchema = sourceTable.getPartitionSchema();
+
+ List<PartitionSchema.HashBucketSchema> hashBucketSchemas =
+ partitionSchema.getHashBucketSchemas();
+ for (PartitionSchema.HashBucketSchema hashBucketSchema :
hashBucketSchemas) {
+ List<String> colNames =
getColumnNamesFromColumnIds(hashBucketSchema.getColumnIds());
+ options.addHashPartitions(colNames, hashBucketSchema.getNumBuckets(),
+ hashBucketSchema.getSeed());
+ }
+
+ PartitionSchema.RangeSchema rangeSchema = partitionSchema.getRangeSchema();
+ List<String> colNames =
getColumnNamesFromColumnIds(rangeSchema.getColumnIds());
+ options.setRangePartitionColumns(colNames);
+
+ return options;
+ }
+
+ /**
+ * Extracts range partitions that have custom hash schemas (different from
table-wide schema).
+ *
+ * <p>This implementation corresponds to the Scala method
+ * TableMetadata.getRangeBoundsPartialRowsWithHashSchemas from the
kudu-backup-common module.
+ */
+ private List<RangePartitionWithCustomHashSchema>
getRangeBoundsPartialRowsWithHashSchemas() {
+ List<RangePartitionWithCustomHashSchema>
rangeBoundsPartialRowsWithHashSchemas =
+ new ArrayList<>();
+ PartitionSchema partitionSchema = sourceTable.getPartitionSchema();
+ List<PartitionSchema.RangeWithHashSchema> rangesWithHashSchemas =
+ partitionSchema.getRangesWithHashSchemas();
+ for (PartitionSchema.RangeWithHashSchema rangeWithHashSchema :
rangesWithHashSchemas) {
+ RangePartitionWithCustomHashSchema partition = new
RangePartitionWithCustomHashSchema(
+ rangeWithHashSchema.lowerBound, rangeWithHashSchema.upperBound,
+ RangePartitionBound.INCLUSIVE_BOUND,
RangePartitionBound.EXCLUSIVE_BOUND);
+
+ List<PartitionSchema.HashBucketSchema> hashBucketSchemas =
rangeWithHashSchema.hashSchemas;
+ for (PartitionSchema.HashBucketSchema hashBucketSchema :
hashBucketSchemas) {
+ List<String> colNames =
getColumnNamesFromColumnIds(hashBucketSchema.getColumnIds());
+ partition.addHashPartitions(colNames, hashBucketSchema.getNumBuckets(),
+ hashBucketSchema.getSeed());
+ }
+ rangeBoundsPartialRowsWithHashSchemas.add(partition);
+ }
+ return rangeBoundsPartialRowsWithHashSchemas;
+ }
+
+ private List<String> getColumnNamesFromColumnIds(List<Integer> columnIds) {
+ List<String> columnNames = new ArrayList<>();
+ for (int id : columnIds) {
+ int idx = sourceTable.getSchema().getColumnIndex(id);
+ columnNames.add(sourceTable.getSchema().getColumnByIndex(idx).getName());
+ }
+ return columnNames;
+ }
+
+ @Override
+ public void close() throws Exception {
+ sourceClient.close();
+ sinkClient.close();
+ }
+}
+
diff --git
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationTableInitializer.java
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationTableInitializer.java
new file mode 100644
index 000000000..a9b00776b
--- /dev/null
+++
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationTableInitializer.java
@@ -0,0 +1,368 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication;
+
+import static org.apache.kudu.test.ClientTestUtil.countRowsInTable;
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Partition;
+import org.apache.kudu.client.PartitionSchema;
+import org.apache.kudu.client.RangePartitionBound;
+import org.apache.kudu.client.RangePartitionWithCustomHashSchema;
+
+public class TestReplicationTableInitializer extends ReplicationTestBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationTableInitializer.class);
+
+ @Override
+ protected ReplicationJobConfig createDefaultJobConfig() {
+ return ReplicationJobConfig.builder()
+
.setSourceMasterAddresses(sourceHarness.getMasterAddressesAsString())
+ .setSinkMasterAddresses(sinkHarness.getMasterAddressesAsString())
+ .setTableName(TABLE_NAME)
+ .setDiscoveryIntervalSeconds(2)
+ .setCreateTable(true)
+ .build();
+ }
+
+ @Test
+ public void testTableInitializationSmoke() throws Exception {
+ createAllTypesTable(sourceClient);
+ insertRowsIntoAllTypesTable(sourceClient, 0, 10);
+
+ envProvider.getEnv().executeAsync();
+
+ KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+ assertEventuallyTrue("Initial 10 rows should be replicated",
+ () -> countRowsInTable(sinkTable) == 10, 60000);
+
+ verifySourceAndSinkRowsEqual(10);
+ }
+
+ @Test
+ public void testHashOnlyPartitioning() throws Exception {
+ // Create table with only hash partitioning (no range partitions)
+ Schema schema = createTestSchema();
+ CreateTableOptions options = new CreateTableOptions()
+ .setRangePartitionColumns(Collections.emptyList())
+ .addHashPartitions(Collections.singletonList("key1"), 3)
+ .addHashPartitions(Arrays.asList("key2", "key3"), 2, 42);
+
+ sourceClient.createTable(TABLE_NAME, schema, options);
+ insertTestRows(sourceClient, 0, 10);
+
+ envProvider.getEnv().executeAsync();
+
+ KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+ assertEventuallyTrue("Hash partitioned table should be replicated",
+ () -> countRowsInTable(sinkTable) == 10, 60000);
+
+ verifyPartitionSchemasMatch();
+ verifyTestRowsEqual(10);
+ }
+
+ @Test
+ public void testRangeOnlyPartitioning() throws Exception {
+ // Create table with only range partitioning (no hash)
+ Schema schema = createTestSchema();
+ CreateTableOptions options = new CreateTableOptions()
+ .setRangePartitionColumns(Collections.singletonList("key1"));
+
+ // Add range splits
+ PartialRow split1 = schema.newPartialRow();
+ split1.addInt("key1", 100);
+ options.addSplitRow(split1);
+
+ PartialRow split2 = schema.newPartialRow();
+ split2.addInt("key1", 200);
+ options.addSplitRow(split2);
+
+ sourceClient.createTable(TABLE_NAME, schema, options);
+ insertTestRows(sourceClient, 0, 15);
+
+ envProvider.getEnv().executeAsync();
+
+ KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+ assertEventuallyTrue("Range partitioned table should be replicated",
+ () -> countRowsInTable(sinkTable) == 15, 60000);
+
+ verifyPartitionSchemasMatch();
+ verifyTestRowsEqual(15);
+ }
+
+ @Test
+ public void testHashAndRangePartitioning() throws Exception {
+ // Create table with both hash and range partitioning
+ Schema schema = createTestSchema();
+ CreateTableOptions options = new CreateTableOptions()
+ .setRangePartitionColumns(Collections.singletonList("key1"))
+ .addHashPartitions(Collections.singletonList("key2"), 2);
+
+ // Add range splits
+ PartialRow split1 = schema.newPartialRow();
+ split1.addInt("key1", 50);
+ options.addSplitRow(split1);
+
+ PartialRow split2 = schema.newPartialRow();
+ split2.addInt("key1", 150);
+ options.addSplitRow(split2);
+
+ sourceClient.createTable(TABLE_NAME, schema, options);
+ insertTestRows(sourceClient, 0, 12);
+
+ envProvider.getEnv().executeAsync();
+
+ KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+ assertEventuallyTrue("Hash+range partitioned table should be replicated",
+ () -> countRowsInTable(sinkTable) == 12, 60000);
+
+ verifyPartitionSchemasMatch();
+ verifyTestRowsEqual(12);
+ }
+
+ @Test
+ public void testRangeWithCustomHashSchemas() throws Exception {
+ // Create table with range partitions having different hash schemas per
range
+ Schema schema = createTestSchema();
+ final CreateTableOptions options = new CreateTableOptions()
+ .setRangePartitionColumns(Collections.singletonList("key1"))
+ .addHashPartitions(Collections.singletonList("key1"), 2);
+
+ // Add range with custom hash schema only
+ PartialRow lower = schema.newPartialRow();
+ lower.addInt("key1", 100);
+ PartialRow upper = schema.newPartialRow();
+ upper.addInt("key1", 200);
+
+ RangePartitionWithCustomHashSchema customRange =
+ new RangePartitionWithCustomHashSchema(
+ lower, upper,
+ RangePartitionBound.INCLUSIVE_BOUND,
+ RangePartitionBound.EXCLUSIVE_BOUND);
+ customRange.addHashPartitions(Collections.singletonList("key1"), 5, 123);
+ // Different bucket count and seed
+ options.addRangePartition(customRange);
+
+ sourceClient.createTable(TABLE_NAME, schema, options);
+ insertTestRows(sourceClient, 100, 10);
+
+ envProvider.getEnv().executeAsync();
+
+ KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+ assertEventuallyTrue("Custom hash schema table should be replicated",
+ () -> countRowsInTable(sinkTable) == 10, 60000);
+
+ verifyPartitionSchemasMatch();
+ verifyTestRowsEqual(10);
+ }
+
+ @Test
+ public void testNonCoveredRangePartitioning() throws Exception {
+ // Create table with explicit range boundaries (non-covered ranges)
+ Schema schema = createTestSchema();
+ CreateTableOptions options = new CreateTableOptions()
+ .setRangePartitionColumns(Collections.singletonList("key1"));
+
+ // Add explicit range partitions with gaps
+ PartialRow lower1 = schema.newPartialRow();
+ lower1.addInt("key1", 0);
+ PartialRow upper1 = schema.newPartialRow();
+ upper1.addInt("key1", 50);
+ options.addRangePartition(lower1, upper1);
+
+ // Gap from 50-100
+ PartialRow lower2 = schema.newPartialRow();
+ lower2.addInt("key1", 100);
+ PartialRow upper2 = schema.newPartialRow();
+ upper2.addInt("key1", 200);
+ options.addRangePartition(lower2, upper2);
+
+ sourceClient.createTable(TABLE_NAME, schema, options);
+ insertTestRows(sourceClient, 0, 5);
+ insertTestRows(sourceClient, 100, 5);
+
+ envProvider.getEnv().executeAsync();
+
+ KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+ assertEventuallyTrue("Non-covered range table should be replicated",
+ () -> countRowsInTable(sinkTable) == 10, 60000);
+
+ verifyPartitionSchemasMatch();
+ verifyTestRowsEqual(10);
+ }
+
+ @Test
+ public void testUnpartitionedTable() throws Exception {
+ // Create table with no partitioning (single tablet)
+ Schema schema = createTestSchema();
+ CreateTableOptions options = new CreateTableOptions()
+ .setRangePartitionColumns(Collections.emptyList());
+
+ sourceClient.createTable(TABLE_NAME, schema, options);
+ insertTestRows(sourceClient, 0, 8);
+
+ envProvider.getEnv().executeAsync();
+
+ KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+ assertEventuallyTrue("Unpartitioned table should be replicated",
+ () -> countRowsInTable(sinkTable) == 8, 60000);
+
+ verifyPartitionSchemasMatch();
+ verifyTestRowsEqual(8);
+ }
+
+ private Schema createTestSchema() {
+ List<ColumnSchema> columns = new ArrayList<>();
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key1",
Type.INT32).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key2",
Type.STRING).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key3",
Type.INT32).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("data",
Type.STRING).build());
+ return new Schema(columns);
+ }
+
+ private void insertTestRows(org.apache.kudu.client.KuduClient client,
+ int startKey, int count) throws Exception {
+ KuduTable table = client.openTable(TABLE_NAME);
+ org.apache.kudu.client.KuduSession session = client.newSession();
+ for (int i = 0; i < count; i++) {
+ org.apache.kudu.client.Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ int key = startKey + i;
+ row.addInt("key1", key);
+ row.addString("key2", "val" + (key % 10));
+ row.addInt("key3", key * 2);
+ row.addString("data", "test data " + key);
+ session.apply(insert);
+ }
+ session.flush();
+ session.close();
+ }
+
+ private void verifyTestRowsEqual(int expectedRowCount) throws Exception {
+ KuduTable sourceTable = sourceClient.openTable(TABLE_NAME);
+ KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+
+ assertEquals(String.format("Source should have %d rows", expectedRowCount),
+ expectedRowCount, countRowsInTable(sourceTable));
+ assertEquals(String.format("Sink should have %d rows", expectedRowCount),
+ expectedRowCount, countRowsInTable(sinkTable));
+
+ org.apache.kudu.client.KuduScanner sourceScanner =
+ sourceClient.newScannerBuilder(sourceTable).build();
+ org.apache.kudu.client.KuduScanner sinkScanner =
+ sinkClient.newScannerBuilder(sinkTable).build();
+
+ int sourceCount = 0;
+ int sinkCount = 0;
+ while (sourceScanner.hasMoreRows()) {
+ sourceCount += sourceScanner.nextRows().getNumRows();
+ }
+ while (sinkScanner.hasMoreRows()) {
+ sinkCount += sinkScanner.nextRows().getNumRows();
+ }
+
+ assertEquals(String.format("Row counts should match (source: %d, sink:
%d)",
+ sourceCount, sinkCount), sourceCount, sinkCount);
+ }
+
+ private void verifyPartitionSchemasMatch() throws Exception {
+ KuduTable sourceTable = sourceClient.openTable(TABLE_NAME);
+ KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+
+ PartitionSchema sourceSchema = sourceTable.getPartitionSchema();
+ PartitionSchema sinkSchema = sinkTable.getPartitionSchema();
+
+ verifyTableWideHashSchemas(sourceSchema, sinkSchema);
+ verifyRangeSchema(sourceSchema, sinkSchema);
+ verifyRangePartitionsWithTableHashSchema(sourceTable, sinkTable);
+ verifyRangesWithCustomHashSchemas(sourceSchema, sinkSchema);
+ }
+
+ private void verifyTableWideHashSchemas(PartitionSchema sourceSchema,
+ PartitionSchema sinkSchema) {
+
+ assertEquals("Table-wide hash schemas should match exactly",
+ sourceSchema.getHashBucketSchemas(),
sinkSchema.getHashBucketSchemas());
+ }
+
+ private void verifyRangeSchema(PartitionSchema sourceSchema, PartitionSchema
sinkSchema)
+ throws Exception {
+ KuduTable sourceTable = sourceClient.openTable(TABLE_NAME);
+ KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+
+ PartitionSchema.RangeSchema sourceRangeSchema =
sourceSchema.getRangeSchema();
+ PartitionSchema.RangeSchema sinkRangeSchema = sinkSchema.getRangeSchema();
+
+ List<String> sourceColumnNames = getColumnNamesFromColumnIds(
+ sourceRangeSchema.getColumnIds(), sourceTable);
+ List<String> sinkColumnNames = getColumnNamesFromColumnIds(
+ sinkRangeSchema.getColumnIds(), sinkTable);
+
+ assertEquals("Range schema column names should match",
+ sourceColumnNames, sinkColumnNames);
+ }
+
+ private List<String> getColumnNamesFromColumnIds(List<Integer> columnIds,
KuduTable table) {
+ List<String> columnNames = new ArrayList<>();
+ for (int id : columnIds) {
+ int idx = table.getSchema().getColumnIndex(id);
+ columnNames.add(table.getSchema().getColumnByIndex(idx).getName());
+ }
+ return columnNames;
+ }
+
+ private void verifyRangePartitionsWithTableHashSchema(KuduTable sourceTable,
KuduTable sinkTable)
+ throws Exception {
+ List<Partition> sourcePartitions =
sourceTable.getRangePartitionsWithTableHashSchema(10000);
+ List<Partition> sinkPartitions =
sinkTable.getRangePartitionsWithTableHashSchema(10000);
+ assertEquals("Range partitions with table hash schema should match
exactly",
+ sourcePartitions, sinkPartitions);
+ }
+
+ private void verifyRangesWithCustomHashSchemas(PartitionSchema sourceSchema,
+ PartitionSchema sinkSchema) {
+ List<PartitionSchema.RangeWithHashSchema> sourceRanges =
+ sourceSchema.getRangesWithHashSchemas();
+ List<PartitionSchema.RangeWithHashSchema> sinkRanges =
sinkSchema.getRangesWithHashSchemas();
+
+ // Convert to Sets for order-independent comparison
+ Set<PartitionSchema.RangeWithHashSchema> sourceSet = new
HashSet<>(sourceRanges);
+ Set<PartitionSchema.RangeWithHashSchema> sinkSet = new
HashSet<>(sinkRanges);
+
+ assertEquals("Ranges with custom hash schemas should match exactly",
+ sourceSet, sinkSet);
+ }
+
+
+}