This is an automated email from the ASF dual-hosted git repository. mgreber pushed a commit to branch flink-replication in repository https://gitbox.apache.org/repos/asf/kudu.git
commit d7d80687ec9214500ace5463756627153ae8e7cb Author: zchovan <[email protected]> AuthorDate: Tue Jan 7 13:52:09 2025 +0100 Replace Apache Bahir with flink-connector This commit replaces the Apache Bahir dependency with the Flink Kudu connector. Change-Id: Ia1b8d1fb504908d2fc1f693fd3a08a578d72cabe --- java/build.gradle | 5 +++++ java/gradle/dependencies.gradle | 4 ++-- .../apache/kudu/replication/ReplicationJob.java | 4 ++++ ...eplicationHarness.java => TestReplication.java} | 25 ++++++++++++++-------- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/java/build.gradle b/java/build.gradle index ea8f68930..a8eb67c55 100755 --- a/java/build.gradle +++ b/java/build.gradle @@ -36,6 +36,11 @@ allprojects { repositories { mavenCentral() mavenLocal() + // TODO(zchovan): remove this, once the kudu connector is published upstream + // temporarily add downstream nexus + maven() { + url = uri("https://nexus-private.infra.cloudera.com/nexus/") + } } // Read the version.txt file to set the project version diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle index 354dad276..688bdf8a2 100755 --- a/java/gradle/dependencies.gradle +++ b/java/gradle/dependencies.gradle @@ -26,7 +26,7 @@ ext { versions += [ async : "1.4.1", - bahir : "1.1.0", + bahir : "2.0-SNAPSHOT", checkstyle : "8.36.1", clojure : "1.10.3", clojureToolsCli: "1.0.206", @@ -97,7 +97,7 @@ libs += [ flinkConnectorBase : "org.apache.flink:flink-connector-base:$versions.flink", flinkCore : "org.apache.flink:flink-core:$versions.flink", flinkDist : "org.apache.flink:flink-dist:$versions.flink", - flinkKuduConnector : "org.apache.bahir:flink-connector-kudu_2.11:$versions.bahir", + flinkKuduConnector : "org.apache.flink:flink-connector-kudu:$versions.bahir", flinkStreamingJava : "org.apache.flink:flink-streaming-java:$versions.flink", guava : "com.google.guava:guava:$versions.guava", hadoopClient : "org.apache.hadoop:hadoop-client:$versions.hadoop", diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java index 0cb08e181..1c83fd9b2 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java @@ -22,10 +22,14 @@ import org.apache.flink.api.java.utils.ParameterTool; import java.util.ArrayList; import java.util.Collections; +/** + * This class is used to submit Kudu Replication Jobs into a Flink Cluster. + */ public class ReplicationJob { static ReplicationJobConfig config; public static void main(String[] args) throws Exception { + // Read parameters from standard args, then convert them into a ReplicationJobConfig object. ParameterTool parameters = ParameterTool.fromArgs(args); config.setSourceMasterAddresses(new ArrayList<>( diff --git a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java similarity index 69% rename from java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java rename to java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java index 8ee06510a..d62a9209d 100644 --- a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java +++ b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java @@ -1,7 +1,5 @@ package org.apache.kudu.replication; -import org.apache.kudu.Schema; -import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduClient; import org.apache.kudu.test.KuduTestHarness; import org.junit.Before; @@ -12,11 +10,14 @@ import org.slf4j.LoggerFactory; import java.util.*; +import static org.apache.kudu.test.ClientTestUtil.createTableWithOneThousandRows; import static org.apache.kudu.test.ClientTestUtil.getBasicSchema; +import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; -public class TestReplicationHarness { - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHarness.class); +public class TestReplication { + private static final Logger LOG = LoggerFactory.getLogger(TestReplication.class); private static final String TABLE_NAME = "replication_test_table"; @Rule @@ -29,16 +30,22 @@ public class TestReplicationHarness { @Before public void setup() { - CreateTableOptions builder = new CreateTableOptions().setNumReplicas(3); - Schema basicSchema = getBasicSchema(); - this.sourceClient = sourceHarness.getClient(); this.sinkClient = sinkHarness.getClient(); - } @Test - public void BenchmarkReplicationScenario() { + public void TestBasicReplication() { + //Setup source table + try { + createTableWithOneThousandRows( + this.sourceHarness.getAsyncClient(), TABLE_NAME, 32 * 1024, DEFAULT_SLEEP); + } catch (Exception e) { + e.printStackTrace(); + LOG.error(e.getMessage()); + fail(e.getMessage()); + } + ReplicationJobConfig config = new ReplicationJobConfig(); config.setSourceMasterAddresses(Arrays.asList(sourceHarness.getMasterAddressesAsString().split(",")));
