This is an automated email from the ASF dual-hosted git repository. mgreber pushed a commit to branch flink-replication in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 4cf118a9f5fc738efe02ccc6d53af64f69fea61c Author: Marton Greber <[email protected]> AuthorDate: Wed Mar 26 22:53:25 2025 +0100 asdf --- .../CustomAbstractSingleOperationMapper.java | 65 ---------------------- .../kudu/replication/CustomColumnSchemas.java | 29 ++++++++++ .../kudu/replication/CustomCreateTableOptions.java | 63 +++++++++++++++++++++ .../CustomReplicationOperationMapper.java | 42 ++++++++++++++ .../CustomReplicationRowRestultConverter.java | 28 ++++++++++ .../kudu/replication/ReplicationJobConfig.java | 3 +- .../kudu/replication/ReplicationJobExecutor.java | 23 +++++--- .../replication/ReplicationTableInitializer.java | 4 ++ .../apache/kudu/replication/TestReplication.java | 7 ++- 9 files changed, 187 insertions(+), 77 deletions(-) diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java deleted file mode 100644 index 01681b0de..000000000 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomAbstractSingleOperationMapper.java +++ /dev/null @@ -1,65 +0,0 @@ -package org.apache.kudu.replication; - -import org.apache.flink.connector.kudu.connector.writer.KuduOperationMapper; -import org.apache.flink.types.Row; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.Operation; -import org.apache.kudu.client.PartialRow; - -import java.util.Collections; -import java.util.List; -import java.util.Optional; - -public class CustomAbstractSingleOperationMapper implements KuduOperationMapper<Row> { - - private final KuduOperation operation; - - - public CustomAbstractSingleOperationMapper(KuduOperation operation) { - this.operation = operation; - } - - public Optional<Operation> createBaseOperation(Row input, KuduTable table) { - if (operation == null) { - throw new UnsupportedOperationException( - "createBaseOperation must be overridden if no operation specified in constructor"); - } - switch (operation) { - case INSERT: - return Optional.of(table.newInsert()); - case UPDATE: - return Optional.of(table.newUpdate()); - case UPSERT: - return Optional.of(table.newUpsert()); - case DELETE: - return Optional.of(table.newDelete()); - default: - throw new RuntimeException("Unknown operation " + operation); - } - } - - @Override - public List<Operation> createOperations(Row input, KuduTable table) { - Optional<Operation> operationOpt = createBaseOperation(input, table); - if (!operationOpt.isPresent()) { - return Collections.emptyList(); - } - - Operation operation = operationOpt.get(); - PartialRow partialRow = operation.getRow(); - - for (int i = 0; i < input.getArity(); i++) { - partialRow.addObject(i, input.getField(i)); - } - - return Collections.singletonList(operation); - } - - /** Kudu operation types. */ - public enum KuduOperation { - INSERT, - UPDATE, - UPSERT, - DELETE - } -} \ No newline at end of file diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomColumnSchemas.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomColumnSchemas.java new file mode 100644 index 000000000..29f36a714 --- /dev/null +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomColumnSchemas.java @@ -0,0 +1,29 @@ +package org.apache.kudu.replication; + +import org.apache.flink.connector.kudu.connector.ColumnSchemasFactory; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduTable; + +import java.util.List; + +public class CustomColumnSchemas implements ColumnSchemasFactory { + private final ReplicationJobConfig config; + + public CustomColumnSchemas(ReplicationJobConfig config) { + this.config = config; + } + + @Override + public List<ColumnSchema> getColumnSchemas() { + try { + KuduClient sourceClient = new KuduClient.KuduClientBuilder(String.join(",", config.getSourceMasterAddresses())).build(); + KuduTable table = sourceClient.openTable(config.getTableName()); + sourceClient.close(); + return table.getSchema().getColumns(); + } catch (KuduException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomCreateTableOptions.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomCreateTableOptions.java new file mode 100644 index 000000000..81d81f5b6 --- /dev/null +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomCreateTableOptions.java @@ -0,0 +1,63 @@ +package org.apache.kudu.replication; + +import org.apache.flink.connector.kudu.connector.CreateTableOptionsFactory; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Partition; +import org.apache.kudu.client.PartitionSchema; + +import java.util.ArrayList; +import java.util.List; + +public class CustomCreateTableOptions implements CreateTableOptionsFactory { + private final ReplicationJobConfig config; + + public CustomCreateTableOptions(ReplicationJobConfig config) { + this.config = config; + } + @Override + public org.apache.kudu.client.CreateTableOptions getCreateTableOptions() { + try { + KuduClient sourceClient = new KuduClient.KuduClientBuilder(String.join(",", config.getSourceMasterAddresses())).build(); + KuduTable table = sourceClient.openTable(config.getTableName()); + CreateTableOptions createTableOptions = new CreateTableOptions(); + + createTableOptions.setComment(table.getComment()); + createTableOptions.setNumReplicas(table.getNumReplicas()); + createTableOptions.setExtraConfigs(table.getExtraConfig()); + PartitionSchema ps = table.getPartitionSchema(); + + // Add hash partitions into the mix + List<PartitionSchema.HashBucketSchema> hb = ps.getHashBucketSchemas(); + for (PartitionSchema.HashBucketSchema hbSchema : hb) { + List<String> colNames = new ArrayList<>(); + for (int id : hbSchema.getColumnIds()) { + int idx = table.getSchema().getColumnIndex(id); + colNames.add(table.getSchema().getColumnByIndex(idx).getName()); + } + createTableOptions.addHashPartitions(colNames, hbSchema.getNumBuckets()); + } + + PartitionSchema.RangeSchema rangeSchema = ps.getRangeSchema(); + List<String> colNames = new ArrayList<>(); + for (int id : rangeSchema.getColumnIds()) { + int idx = table.getSchema().getColumnIndex(id); + colNames.add(table.getSchema().getColumnByIndex(idx).getName()); + } + createTableOptions.setRangePartitionColumns(colNames); + List<Partition> ranges = table.getRangePartitions(9999); + for (Partition partition : ranges) { + createTableOptions.addRangePartition(partition.getDecodedRangeKeyStart(table), partition.getDecodedRangeKeyEnd(table)); + } + + sourceClient.close(); + return createTableOptions; + } catch (RuntimeException | KuduException e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java new file mode 100644 index 000000000..466b7bcc3 --- /dev/null +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java @@ -0,0 +1,42 @@ +package org.apache.kudu.replication; + +import org.apache.flink.connector.kudu.connector.writer.KuduOperationMapper; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class CustomReplicationOperationMapper implements KuduOperationMapper<Row> { + + public Optional<Operation> createBaseOperation(Row input, KuduTable table) { + RowKind kind = input.getKind(); + if (kind == RowKind.DELETE) { + return Optional.of(table.newDeleteIgnore()); + } else { + return Optional.of(table.newUpsertIgnore()); + } + } + + @Override + public List<Operation> createOperations(Row input, KuduTable table) { + Optional<Operation> operationOpt = createBaseOperation(input, table); + if (!operationOpt.isPresent()) { + return Collections.emptyList(); + } + + Operation operation = operationOpt.get(); + PartialRow partialRow = operation.getRow(); + + for (int i = 0; i < input.getArity(); i++) { + partialRow.addObject(i, input.getField(i)); + } + + return Collections.singletonList(operation); + } + +} \ No newline at end of file diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java new file mode 100644 index 000000000..16f08ef4e --- /dev/null +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java @@ -0,0 +1,28 @@ +package org.apache.kudu.replication; + +import org.apache.flink.connector.kudu.connector.converter.RowResultConverter; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.kudu.Schema; +import org.apache.kudu.client.RowResult; + +public class CustomReplicationRowRestultConverter implements RowResultConverter<Row> { + public CustomReplicationRowRestultConverter() { + } + + public Row convert(RowResult row) { + Schema schema = row.getColumnProjection(); + Row values = new Row(schema.getColumnCount()); + if (schema.hasIsDeleted()) { + values.setKind((RowKind.DELETE)); + } + schema.getColumns().forEach((column) -> { + String name = column.getName(); + int pos = schema.getColumnIndex(name); + if (!row.isNull(name)) { + values.setField(pos, row.getObject(name)); + } + }); + return values; + } +} diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java index 5f46b79c0..ca236babf 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java @@ -17,12 +17,13 @@ package org.apache.kudu.replication; +import java.io.Serializable; import java.util.List; /** * A configuration object for ReplicationJobs used for the Kudu Flink based replication. */ -public class ReplicationJobConfig { +public class ReplicationJobConfig implements Serializable { private List<String> sourceMasterAddresses; private List<String> sinkMasterAddresses; private String tableName; diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java index d9d3c7a2f..9d1806c4f 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java @@ -26,6 +26,7 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.connector.kudu.connector.ColumnSchemasFactory; +import org.apache.flink.connector.kudu.connector.CreateTableOptionsFactory; import org.apache.flink.connector.kudu.connector.KuduTableInfo; import org.apache.flink.connector.kudu.connector.converter.RowResultRowConverter; import org.apache.flink.connector.kudu.connector.reader.KuduInputSplit; @@ -49,11 +50,16 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Operation; import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.Partition; +import org.apache.kudu.client.PartitionSchema; import org.apache.kudu.shaded.com.google.common.collect.Lists; import java.util.*; @@ -73,12 +79,19 @@ class ReplicationJobExecutor { * Executes the actual business logic of the replication. */ public void runJob() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + KuduClient sinkClient = new KuduClient.KuduClientBuilder(String.join(",", config.getSinkMasterAddresses())).build(); + KuduClient sourceClient = new KuduClient.KuduClientBuilder(String.join(",", config.getSourceMasterAddresses())).build(); + if (!sinkClient.tableExists(config.getTableName())) { + Schema schema = sourceClient.openTable(config.getTableName()).getSchema(); + sinkClient.createTable(config.getTableName(), schema, new CustomCreateTableOptions(config).getCreateTableOptions()); + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KuduSink<Row> kuduSink = new KuduSinkBuilder<Row>() .setWriterConfig(getWriterConfig()) .setTableInfo(getTableInfo()) - .setOperationMapper(getRowOperationMapper()) + .setOperationMapper(new CustomReplicationOperationMapper()) .build(); KuduSource<Row> kuduSource = new KuduSourceBuilder<Row>() @@ -102,12 +115,6 @@ class ReplicationJobExecutor { .build(); } - // TODO: figure out the mechanism to get this auto populated - // for now just got the column names hardcoed ~.~ - private KuduOperationMapper getRowOperationMapper() { - return new CustomAbstractSingleOperationMapper(CustomAbstractSingleOperationMapper.KuduOperation.UPSERT); - - } private KuduReaderConfig getReaderConfig() { return KuduReaderConfig.Builder diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java new file mode 100644 index 000000000..525f2dbd2 --- /dev/null +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationTableInitializer.java @@ -0,0 +1,4 @@ +package org.apache.kudu.replication; + +public class ReplicationTableInitializer { +} diff --git a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java index 16f1364dd..e5f23477a 100644 --- a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java +++ b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java @@ -43,7 +43,7 @@ public class TestReplication { //Setup source table try { createTableWithOneThousandRows( - this.sourceHarness.getAsyncClient(), TABLE_NAME, 32 * 1024, DEFAULT_SLEEP); + this.sourceHarness.getAsyncClient(), TABLE_NAME, 10, DEFAULT_SLEEP); } catch (Exception e) { e.printStackTrace(); LOG.error(e.getMessage()); @@ -52,7 +52,7 @@ public class TestReplication { // We create the sink table here, temporary workaround to get things up and running. // TODO: ideally we should not need to do this. - KuduTable sinkTable = createDefaultTable(this.sinkHarness.getClient(), TABLE_NAME); +// KuduTable sinkTable = createDefaultTable(this.sinkHarness.getClient(), TABLE_NAME); ReplicationJobConfig config = new ReplicationJobConfig(); @@ -63,7 +63,8 @@ public class TestReplication { ReplicationJobExecutor executor = new ReplicationJobExecutor(config); executor.runJob(); - sinkTable = sinkClient.openTable(TABLE_NAME); +// sinkTable = sinkClient.openTable(TABLE_NAME); + KuduTable sinkTable = sinkClient.openTable(TABLE_NAME); assertEquals(1000, countRowsInTable(sinkTable)); }
