This is an automated email from the ASF dual-hosted git repository.

mgreber pushed a commit to branch flink-replication
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0bfcb69360a7597ea90d21f58076199aa9510f2b
Author: zchovan <[email protected]>
AuthorDate: Mon Feb 3 14:26:49 2025 +0100

    updated replication job
    
    Change-Id: I4a33c895ee05f417026b1a41bc216b1ae46c0f98
---
 .../org/apache/kudu/replication/KuduSource.java    |   5 +
 .../apache/kudu/replication/ReplicationJob.java    |   9 +-
 .../kudu/replication/ReplicationJobConfig.java     |  17 ++-
 .../kudu/replication/ReplicationJobExecutor.java   | 165 ++++++++++++++++++++-
 .../apache/kudu/replication/TestReplication.java   |  23 +++
 5 files changed, 212 insertions(+), 7 deletions(-)

diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java
index eeb616903..3a74aa71d 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java
@@ -20,11 +20,16 @@ package org.apache.kudu.replication;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.kudu.client.*;
 import org.apache.kudu.util.HybridTimeUtil;
 
+/**
+ * Flink Source class for Kudu.
+ */
+@PublicEvolving
 public class KuduSource extends RichSourceFunction<String> {
     private final List<String> kuduMasters;
     private final String tableName;
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
index 1c83fd9b2..ba41d4ca8 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
@@ -18,16 +18,23 @@
 package org.apache.kudu.replication;
 
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.kudu.sink.KuduSink;
 
 import java.util.ArrayList;
 import java.util.Collections;
 
 /**
- * This class is used to submit Kudu Replication Jobs into a Flink Cluster.
+ * This class is used to submit Kudu Replication Jobs into a Flink Cluster. 
This is a wrapper for convenience.
  */
 public class ReplicationJob {
     static ReplicationJobConfig config;
 
+    /**
+     * Generic entry point of the replication job.
+     *
+     * @param args
+     * @throws Exception
+     */
     public static void main(String[] args) throws Exception {
         // Read parameters from standard args, then convert them into a 
ReplicationJobConfig object.
         ParameterTool parameters = ParameterTool.fromArgs(args);
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
index 52c644b60..5f46b79c0 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
@@ -19,6 +19,9 @@ package org.apache.kudu.replication;
 
 import java.util.List;
 
+/**
+ * A configuration object for ReplicationJobs used for the Kudu Flink based 
replication.
+ */
 public class ReplicationJobConfig {
     private List<String> sourceMasterAddresses;
     private List<String> sinkMasterAddresses;
@@ -28,20 +31,32 @@ public class ReplicationJobConfig {
         return sourceMasterAddresses;
     }
 
+    /**
+     * @param sourceMasterAddresses
+     */
     public void setSourceMasterAddresses(List<String> sourceMasterAddresses) {
         this.sourceMasterAddresses = sourceMasterAddresses;
     }
 
+    /**
+     * @return List of master addresses in host:port format.
+     */
     public List<String> getSinkMasterAddresses() {
         return sinkMasterAddresses;
     }
 
+    /**
+     * @param sinkMasterAddresses
+     */
     public void setSinkMasterAddresses(List<String> sinkMasterAddresses) {
         this.sinkMasterAddresses = sinkMasterAddresses;
     }
 
+    /**
+     * @return Name of the table the configuration is created for.
+     */
     public String getTableName() {
-        return tableName;
+        return this.tableName;
     }
 
     public void setTableName(String tableName) {
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
index 6ad63ae09..c5d568ace 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
@@ -17,27 +17,182 @@
 
 package org.apache.kudu.replication;
 
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.kudu.connector.ColumnSchemasFactory;
+import org.apache.flink.connector.kudu.connector.KuduTableInfo;
+import 
org.apache.flink.connector.kudu.connector.converter.RowResultRowConverter;
+import org.apache.flink.connector.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connector.kudu.connector.reader.KuduReader;
+import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connector.kudu.connector.reader.KuduReaderIterator;
+import 
org.apache.flink.connector.kudu.connector.writer.AbstractSingleOperationMapper;
+import org.apache.flink.connector.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connector.kudu.connector.writer.RowOperationMapper;
+import org.apache.flink.connector.kudu.sink.KuduSink;
+import org.apache.flink.connector.kudu.sink.KuduSinkBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.shaded.com.google.common.collect.Lists;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Class for executing the replication actions.
+ */
 class ReplicationJobExecutor {
-    ReplicationJobConfig config;
+    private ReplicationJobConfig config;
+    private final String[] columns = new String[] {"id", "title", "author", 
"price", "quantity"};
+    private final Object[][] booksTableData = {
+            {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
+            {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
+            {1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33},
+            {1004, "A Cup of Java", "Kumar", 44.44, 44},
+            {1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55}
+    };
+
+    private List<Row> booksDataRow;
 
     ReplicationJobExecutor(ReplicationJobConfig config) {
         this.config = config;
+        this.booksDataRow =  Arrays.stream(booksTableData)
+            .map(
+                row -> {
+                    Integer rowId = (Integer) row[0];
+                    if (rowId % 2 == 1) {
+                        Row values = new Row(5);
+                        values.setField(0, row[0]);
+                        values.setField(1, row[1]);
+                        values.setField(2, row[2]);
+                        values.setField(3, row[3]);
+                        values.setField(4, row[4]);
+                        return values;
+                    } else {
+                        Row values = new Row(5);
+                        values.setField(0, row[0]);
+                        values.setField(1, row[1]);
+                        values.setField(2, row[2]);
+                        return values;
+                    }
+                })
+            .collect(Collectors.toList());
     }
 
+    /**
+     * Executes the actual business logic of the replication.
+     */
     public void runJob() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        DataStream<String> kuduSourceStream = env.addSource(
-                new KuduSource(config.getSourceMasterAddresses(), 
config.getTableName()));
+//        DataStream<String> kuduSourceStream = env.addSource(
+//                new KuduSource(config.getSourceMasterAddresses(), 
config.getTableName()));
+
+        KuduWriterConfig writerConfig =
+                KuduWriterConfig.Builder
+                        .setMasters(String.join(",", 
config.getSinkMasterAddresses()))
+                        .setStrongConsistency()
+                        .build();
+
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), 
true);
+
+        KuduSink<Row> sink =
+                KuduSink.<Row>builder()
+                        .setWriterConfig(writerConfig)
+                        .setTableInfo(tableInfo)
+                        .setOperationMapper(initOperationMapper(columns))
+                        .build();
 
-        kuduSourceStream.print();
 
         try {
-            env.execute("Kudu Source Job");
+            SinkWriter<Row> writer = sink.createWriter((Sink.InitContext) 
null);
+
+            for (Row kuduRow : booksDataRow) {
+                writer.write(kuduRow, null);
+            }
+            writer.close();
+
+            List<Row> rows = readRows(tableInfo, String.join(",", 
config.getSinkMasterAddresses()));
+
+            env.execute("Kudu Test Job");
         } catch (Exception e) {
             System.out.println(e.getMessage());
         }
     }
+
+
+    // everything below this is just temp helper stuff
+    private RowOperationMapper initOperationMapper(String[] cols) {
+        return new RowOperationMapper(cols, 
AbstractSingleOperationMapper.KuduOperation.INSERT);
+    }
+
+    public static KuduTableInfo booksTableInfo(String tableName, boolean 
createIfNotExist) {
+
+        KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
+
+        if (createIfNotExist) {
+            ColumnSchemasFactory schemasFactory =
+                    () -> {
+                        List<ColumnSchema> schemas = new ArrayList<>();
+                        schemas.add(
+                                new ColumnSchema.ColumnSchemaBuilder("id", 
Type.INT32)
+                                        .key(true)
+                                        .build());
+                        schemas.add(
+                                new ColumnSchema.ColumnSchemaBuilder("title", 
Type.STRING).build());
+                        schemas.add(
+                                new ColumnSchema.ColumnSchemaBuilder("author", 
Type.STRING)
+                                        .build());
+                        schemas.add(
+                                new ColumnSchema.ColumnSchemaBuilder("price", 
Type.DOUBLE)
+                                        .nullable(true)
+                                        .build());
+                        schemas.add(
+                                new 
ColumnSchema.ColumnSchemaBuilder("quantity", Type.INT32)
+                                        .nullable(true)
+                                        .build());
+                        return schemas;
+                    };
+
+            tableInfo.createTableIfNotExists(
+                    schemasFactory,
+                    () ->
+                            new CreateTableOptions()
+                                    .setNumReplicas(1)
+                                    
.addHashPartitions(Lists.newArrayList("id"), 2));
+        }
+
+        return tableInfo;
+    }
+
+    protected List<Row> readRows(KuduTableInfo tableInfo, String 
masterAddresses) throws Exception {
+        KuduReaderConfig readerConfig =
+                KuduReaderConfig.Builder.setMasters(masterAddresses).build();
+        KuduReader<Row> reader =
+                new KuduReader<>(tableInfo, readerConfig, new 
RowResultRowConverter());
+
+        KuduInputSplit[] splits = reader.createInputSplits(1);
+        List<Row> rows = new ArrayList<>();
+        for (KuduInputSplit split : splits) {
+            KuduReaderIterator<Row> resultIterator = 
reader.scanner(split.getScanToken());
+            while (resultIterator.hasNext()) {
+                Row row = resultIterator.next();
+                if (row != null) {
+                    rows.add(row);
+                }
+            }
+        }
+        reader.close();
+
+        return rows;
+    }
+
+
+
 }
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
index d62a9209d..869f38e2e 100644
--- 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
@@ -58,4 +58,27 @@ public class TestReplication {
         assertTrue(true);
     }
 
+    @Test
+    @KuduTestHarness.EnableKerberos
+    public void TestReplicationWithKerberos() {
+        try {
+            createTableWithOneThousandRows(
+                    this.sourceHarness.getAsyncClient(), TABLE_NAME, 32 * 
1024, DEFAULT_SLEEP);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOG.error(e.getMessage());
+            fail(e.getMessage());
+        }
+
+        ReplicationJobConfig config = new ReplicationJobConfig();
+
+        
config.setSourceMasterAddresses(Arrays.asList(sourceHarness.getMasterAddressesAsString().split(",")));
+        
config.setSinkMasterAddresses(Arrays.asList(sinkHarness.getMasterAddressesAsString().split(",")));
+        config.setTableName(TABLE_NAME);
+
+        ReplicationJobExecutor executor = new ReplicationJobExecutor(config);
+        executor.runJob();
+
+        assertTrue(true);
+    }
 }

Reply via email to