This is an automated email from the ASF dual-hosted git repository. mgreber pushed a commit to branch flink-replication in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 0bfcb69360a7597ea90d21f58076199aa9510f2b Author: zchovan <[email protected]> AuthorDate: Mon Feb 3 14:26:49 2025 +0100 updated replication job Change-Id: I4a33c895ee05f417026b1a41bc216b1ae46c0f98 --- .../org/apache/kudu/replication/KuduSource.java | 5 + .../apache/kudu/replication/ReplicationJob.java | 9 +- .../kudu/replication/ReplicationJobConfig.java | 17 ++- .../kudu/replication/ReplicationJobExecutor.java | 165 ++++++++++++++++++++- .../apache/kudu/replication/TestReplication.java | 23 +++ 5 files changed, 212 insertions(+), 7 deletions(-) diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java index eeb616903..3a74aa71d 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java @@ -20,11 +20,16 @@ package org.apache.kudu.replication; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.kudu.client.*; import org.apache.kudu.util.HybridTimeUtil; +/** + * Flink Source class for Kudu. + */ +@PublicEvolving public class KuduSource extends RichSourceFunction<String> { private final List<String> kuduMasters; private final String tableName; diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java index 1c83fd9b2..ba41d4ca8 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java @@ -18,16 +18,23 @@ package org.apache.kudu.replication; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.kudu.sink.KuduSink; import java.util.ArrayList; import java.util.Collections; /** - * This class is used to submit Kudu Replication Jobs into a Flink Cluster. + * This class is used to submit Kudu Replication Jobs into a Flink Cluster. This is a wrapper for convenience. */ public class ReplicationJob { static ReplicationJobConfig config; + /** + * Generic entry point of the replication job. + * + * @param args + * @throws Exception + */ public static void main(String[] args) throws Exception { // Read parameters from standard args, then convert them into a ReplicationJobConfig object. ParameterTool parameters = ParameterTool.fromArgs(args); diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java index 52c644b60..5f46b79c0 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java @@ -19,6 +19,9 @@ package org.apache.kudu.replication; import java.util.List; +/** + * A configuration object for ReplicationJobs used for the Kudu Flink based replication. + */ public class ReplicationJobConfig { private List<String> sourceMasterAddresses; private List<String> sinkMasterAddresses; @@ -28,20 +31,32 @@ public class ReplicationJobConfig { return sourceMasterAddresses; } + /** + * @param sourceMasterAddresses + */ public void setSourceMasterAddresses(List<String> sourceMasterAddresses) { this.sourceMasterAddresses = sourceMasterAddresses; } + /** + * @return List of master addresses in host:port format. + */ public List<String> getSinkMasterAddresses() { return sinkMasterAddresses; } + /** + * @param sinkMasterAddresses + */ public void setSinkMasterAddresses(List<String> sinkMasterAddresses) { this.sinkMasterAddresses = sinkMasterAddresses; } + /** + * @return Name of the table the configuration is created for. + */ public String getTableName() { - return tableName; + return this.tableName; } public void setTableName(String tableName) { diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java index 6ad63ae09..c5d568ace 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java @@ -17,27 +17,182 @@ package org.apache.kudu.replication; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.kudu.connector.ColumnSchemasFactory; +import org.apache.flink.connector.kudu.connector.KuduTableInfo; +import org.apache.flink.connector.kudu.connector.converter.RowResultRowConverter; +import org.apache.flink.connector.kudu.connector.reader.KuduInputSplit; +import org.apache.flink.connector.kudu.connector.reader.KuduReader; +import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig; +import org.apache.flink.connector.kudu.connector.reader.KuduReaderIterator; +import org.apache.flink.connector.kudu.connector.writer.AbstractSingleOperationMapper; +import org.apache.flink.connector.kudu.connector.writer.KuduWriterConfig; +import org.apache.flink.connector.kudu.connector.writer.RowOperationMapper; +import org.apache.flink.connector.kudu.sink.KuduSink; +import org.apache.flink.connector.kudu.sink.KuduSinkBuilder; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.types.Row; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Type; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.shaded.com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Class for executing the replication actions. + */ class ReplicationJobExecutor { - ReplicationJobConfig config; + private ReplicationJobConfig config; + private final String[] columns = new String[] {"id", "title", "author", "price", "quantity"}; + private final Object[][] booksTableData = { + {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11}, + {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22}, + {1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33}, + {1004, "A Cup of Java", "Kumar", 44.44, 44}, + {1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55} + }; + + private List<Row> booksDataRow; ReplicationJobExecutor(ReplicationJobConfig config) { this.config = config; + this.booksDataRow = Arrays.stream(booksTableData) + .map( + row -> { + Integer rowId = (Integer) row[0]; + if (rowId % 2 == 1) { + Row values = new Row(5); + values.setField(0, row[0]); + values.setField(1, row[1]); + values.setField(2, row[2]); + values.setField(3, row[3]); + values.setField(4, row[4]); + return values; + } else { + Row values = new Row(5); + values.setField(0, row[0]); + values.setField(1, row[1]); + values.setField(2, row[2]); + return values; + } + }) + .collect(Collectors.toList()); } + /** + * Executes the actual business logic of the replication. + */ public void runJob() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream<String> kuduSourceStream = env.addSource( - new KuduSource(config.getSourceMasterAddresses(), config.getTableName())); +// DataStream<String> kuduSourceStream = env.addSource( +// new KuduSource(config.getSourceMasterAddresses(), config.getTableName())); + + KuduWriterConfig writerConfig = + KuduWriterConfig.Builder + .setMasters(String.join(",", config.getSinkMasterAddresses())) + .setStrongConsistency() + .build(); + + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true); + + KuduSink<Row> sink = + KuduSink.<Row>builder() + .setWriterConfig(writerConfig) + .setTableInfo(tableInfo) + .setOperationMapper(initOperationMapper(columns)) + .build(); - kuduSourceStream.print(); try { - env.execute("Kudu Source Job"); + SinkWriter<Row> writer = sink.createWriter((Sink.InitContext) null); + + for (Row kuduRow : booksDataRow) { + writer.write(kuduRow, null); + } + writer.close(); + + List<Row> rows = readRows(tableInfo, String.join(",", config.getSinkMasterAddresses())); + + env.execute("Kudu Test Job"); } catch (Exception e) { System.out.println(e.getMessage()); } } + + + // everything below this is just temp helper stuff + private RowOperationMapper initOperationMapper(String[] cols) { + return new RowOperationMapper(cols, AbstractSingleOperationMapper.KuduOperation.INSERT); + } + + public static KuduTableInfo booksTableInfo(String tableName, boolean createIfNotExist) { + + KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName); + + if (createIfNotExist) { + ColumnSchemasFactory schemasFactory = + () -> { + List<ColumnSchema> schemas = new ArrayList<>(); + schemas.add( + new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32) + .key(true) + .build()); + schemas.add( + new ColumnSchema.ColumnSchemaBuilder("title", Type.STRING).build()); + schemas.add( + new ColumnSchema.ColumnSchemaBuilder("author", Type.STRING) + .build()); + schemas.add( + new ColumnSchema.ColumnSchemaBuilder("price", Type.DOUBLE) + .nullable(true) + .build()); + schemas.add( + new ColumnSchema.ColumnSchemaBuilder("quantity", Type.INT32) + .nullable(true) + .build()); + return schemas; + }; + + tableInfo.createTableIfNotExists( + schemasFactory, + () -> + new CreateTableOptions() + .setNumReplicas(1) + .addHashPartitions(Lists.newArrayList("id"), 2)); + } + + return tableInfo; + } + + protected List<Row> readRows(KuduTableInfo tableInfo, String masterAddresses) throws Exception { + KuduReaderConfig readerConfig = + KuduReaderConfig.Builder.setMasters(masterAddresses).build(); + KuduReader<Row> reader = + new KuduReader<>(tableInfo, readerConfig, new RowResultRowConverter()); + + KuduInputSplit[] splits = reader.createInputSplits(1); + List<Row> rows = new ArrayList<>(); + for (KuduInputSplit split : splits) { + KuduReaderIterator<Row> resultIterator = reader.scanner(split.getScanToken()); + while (resultIterator.hasNext()) { + Row row = resultIterator.next(); + if (row != null) { + rows.add(row); + } + } + } + reader.close(); + + return rows; + } + + + } diff --git a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java index d62a9209d..869f38e2e 100644 --- a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java +++ b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java @@ -58,4 +58,27 @@ public class TestReplication { assertTrue(true); } + @Test + @KuduTestHarness.EnableKerberos + public void TestReplicationWithKerberos() { + try { + createTableWithOneThousandRows( + this.sourceHarness.getAsyncClient(), TABLE_NAME, 32 * 1024, DEFAULT_SLEEP); + } catch (Exception e) { + e.printStackTrace(); + LOG.error(e.getMessage()); + fail(e.getMessage()); + } + + ReplicationJobConfig config = new ReplicationJobConfig(); + + config.setSourceMasterAddresses(Arrays.asList(sourceHarness.getMasterAddressesAsString().split(","))); + config.setSinkMasterAddresses(Arrays.asList(sinkHarness.getMasterAddressesAsString().split(","))); + config.setTableName(TABLE_NAME); + + ReplicationJobExecutor executor = new ReplicationJobExecutor(config); + executor.runJob(); + + assertTrue(true); + } }
