This is an automated email from the ASF dual-hosted git repository. mgreber pushed a commit to branch flink-replication in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 44fc8f31ba28af7c59e9fdae37a27e5d6d9f92e5 Author: zchovan <[email protected]> AuthorDate: Mon Nov 18 15:19:13 2024 +0100 [java] init kudu-replication subproject This change introduces a new gradle subproject 'kudu-replication'. This subproject contain the new Kudu replication Flink job and it's related resources/dependencies. In this commit: * new Java dependencies were added, for Flink and it's Kudu connector: * org.apache.flink:flink-clients * org.apache.flink:flink-connector-base * org.apache.flink:flink-core * org.apache.flink:flink-dist * org.apache.bahir:flink-connector-kudu_2.11 * org.apache.flink:flink-streaming-java * created a new Java class 'ReplicationJob' that is executable by Flink * created a new configuratin class for the ReplicationJob * created an initial test setup for replication Change-Id: Idf2dfee6e6458aa4a559698792e6a1139e67f338 --- java/gradle/dependencies.gradle | 10 ++ .../build.gradle} | 33 +++--- .../org/apache/kudu/replication/KuduSource.java | 127 +++++++++++++++++++++ .../apache/kudu/replication/ReplicationJob.java | 41 +++++++ .../kudu/replication/ReplicationJobConfig.java | 50 ++++++++ .../kudu/replication/ReplicationJobExecutor.java} | 39 ++++--- .../kudu/replication/TestReplicationHarness.java | 54 +++++++++ .../src/test/resources/log4j2.properties | 32 ++++++ java/settings.gradle | 1 + 9 files changed, 359 insertions(+), 28 deletions(-) diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle index d0015dd4b..354dad276 100755 --- a/java/gradle/dependencies.gradle +++ b/java/gradle/dependencies.gradle @@ -26,12 +26,14 @@ ext { versions += [ async : "1.4.1", + bahir : "1.1.0", checkstyle : "8.36.1", clojure : "1.10.3", clojureToolsCli: "1.0.206", commonsIo : "2.11.0", errorProne : "2.3.3", errorProneJavac: "9+181-r4173-1", + flink : "1.19.1", gradle : "7.6.4", guava : "32.1.1-jre", hadoop : "3.3.1", @@ -62,6 +64,7 @@ versions += [ spark2 : "2.4.8", spark : "3.2.4", spotBugs : "4.1.1", + testcontainers : "1.20.3", yetus : "0.13.0" ] @@ -90,6 +93,12 @@ libs += [ commonsIo : "commons-io:commons-io:$versions.commonsIo", errorProne : "com.google.errorprone:error_prone_core:$versions.errorProne", errorProneJavac : "com.google.errorprone:javac:$versions.errorProneJavac", + flinkClients : "org.apache.flink:flink-clients:$versions.flink", + flinkConnectorBase : "org.apache.flink:flink-connector-base:$versions.flink", + flinkCore : "org.apache.flink:flink-core:$versions.flink", + flinkDist : "org.apache.flink:flink-dist:$versions.flink", + flinkKuduConnector : "org.apache.bahir:flink-connector-kudu_2.11:$versions.bahir", + flinkStreamingJava : "org.apache.flink:flink-streaming-java:$versions.flink", guava : "com.google.guava:guava:$versions.guava", hadoopClient : "org.apache.hadoop:hadoop-client:$versions.hadoop", hadoopCommon : "org.apache.hadoop:hadoop-common:$versions.hadoop", @@ -138,5 +147,6 @@ libs += [ sparkCore : "org.apache.spark:spark-core_$versions.scalaBase:$versions.spark", sparkSql : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark", sparkSqlTest : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark:tests", + testcontainers : "org.testcontainers:testcontainers:$versions.testcontainers", yetusAnnotations : "org.apache.yetus:audience-annotations:$versions.yetus" ] diff --git a/java/settings.gradle b/java/kudu-replication/build.gradle similarity index 53% copy from java/settings.gradle copy to java/kudu-replication/build.gradle index ebe587ab8..9ba64bbc8 100644 --- a/java/settings.gradle +++ b/java/kudu-replication/build.gradle @@ -15,18 +15,23 @@ // specific language governing permissions and limitations // under the License. -// This file contains the configuration of the project hierarchy. -// Mainly we just define what subprojects are in the build. +apply from: "$rootDir/gradle/shadow.gradle" -rootProject.name = "kudu-parent" -include "kudu-backup" -include "kudu-backup-common" -include "kudu-backup-tools" -include "kudu-client" -include "kudu-hive" -include "kudu-jepsen" -include "kudu-proto" -include "kudu-spark" -include "kudu-spark-tools" -include "kudu-subprocess" -include "kudu-test-utils" +dependencies { + implementation libs.flinkKuduConnector + implementation libs.flinkCore + implementation libs.flinkStreamingJava + implementation libs.flinkClients + implementation libs.flinkConnectorBase + + implementation project(path: ":kudu-client", configuration: "shadow") + + testImplementation libs.junit + testImplementation libs.log4jApi + testImplementation libs.log4jCore + testImplementation libs.log4jSlf4jImpl + testImplementation libs.testcontainers + testImplementation project(path: ":kudu-test-utils", configuration: "shadow") + testImplementation libs.jmhCore + testImplementation libs.jmhGenerator +} diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java new file mode 100644 index 000000000..eeb616903 --- /dev/null +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kudu.replication; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.kudu.client.*; +import org.apache.kudu.util.HybridTimeUtil; + +public class KuduSource extends RichSourceFunction<String> { + private final List<String> kuduMasters; + private final String tableName; + private transient KuduClient client; + private volatile boolean isRunning = true; + private final long scanIntervalMillis = 5000; // Set your desired scan interval + private long lastTimestamp = 0; // Track the last timestamp + private boolean isFirstScan = true; // Track if it's the first scan + + public KuduSource(List<String> masterAddresses, String tableName) { + this.kuduMasters = masterAddresses; + this.tableName = tableName; + } + + @Override + public void open(Configuration parameters) throws Exception { + try { + client = new KuduClient.KuduClientBuilder(kuduMasters).build(); + System.out.println("Kudu client initialized."); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException("Failed to initialize Kudu client.", e); + } + } + + @Override + public void run(SourceContext<String> ctx) throws Exception { + while (isRunning) { + KuduTable table = client.openTable(tableName); + long currentMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + long currentTimestamp = HybridTimeUtil.physicalAndLogicalToHTTimestamp(currentMicros, 0) + 1; + List<KuduScanToken> tokens; + if (isFirstScan) { + // Perform the initial snapshot scan + tokens = client.newScanTokenBuilder(table) + .snapshotTimestampRaw(currentTimestamp) + .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) + .build(); + for (KuduScanToken token : tokens) { + KuduScanner scanner = KuduScanToken.deserializeIntoScanner(token.serialize(), client); + while (scanner.hasMoreRows()) { + RowResultIterator results = scanner.nextRows(); + while (results.hasNext()) { + RowResult result = results.next(); + String rowData = result.rowToString(); + System.out.println("Fetched row: " + rowData); + synchronized (ctx.getCheckpointLock()) { + ctx.collect(rowData); + } + } + } + } + isFirstScan = false; // Mark the first scan as complete + } else { + // Perform the timestamp-based diff scan + tokens = client.newScanTokenBuilder(table) + .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) + .diffScan(lastTimestamp, currentTimestamp) + .build(); + for (KuduScanToken token : tokens) { + KuduScanner scanner = KuduScanToken.deserializeIntoScanner(token.serialize(), client); + while (scanner.hasMoreRows()) { + RowResultIterator results = scanner.nextRows(); + while (results.hasNext()) { + RowResult result = results.next(); + String rowData = result.rowToString(); + System.out.println("Fetched row: " + rowData); + synchronized (ctx.getCheckpointLock()) { + ctx.collect(rowData); + } + } + } + } + } + // Update the last timestamp for the next diff scan + lastTimestamp = currentTimestamp; + // Sleep for the defined interval before the next scan + Thread.sleep(scanIntervalMillis); + } + } + + @Override + public void cancel() { + isRunning = false; + if (client != null) { + try { + client.close(); + } catch (KuduException e) { + e.printStackTrace(); + } + } + } + + @Override + public void close() throws Exception { + if (client != null) { + client.close(); + } + } +} diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java new file mode 100644 index 000000000..0cb08e181 --- /dev/null +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.replication; + +import org.apache.flink.api.java.utils.ParameterTool; + +import java.util.ArrayList; +import java.util.Collections; + +public class ReplicationJob { + static ReplicationJobConfig config; + + public static void main(String[] args) throws Exception { + ParameterTool parameters = ParameterTool.fromArgs(args); + + config.setSourceMasterAddresses(new ArrayList<>( + Collections.singletonList(parameters.get("sourceMasterAddresses", "127.0.0.1:8764")))); + config.setSinkMasterAddresses(new ArrayList<>( + Collections.singletonList(parameters.get("sinkMasterAddresses", "127.0.0.1:8764")))); + config.setTableName(parameters.get("tableName", "test_table")); + + ReplicationJobExecutor replicationJobExecutor = new ReplicationJobExecutor(config); + replicationJobExecutor.runJob(); + } +} + diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java new file mode 100644 index 000000000..52c644b60 --- /dev/null +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.replication; + +import java.util.List; + +public class ReplicationJobConfig { + private List<String> sourceMasterAddresses; + private List<String> sinkMasterAddresses; + private String tableName; + + public List<String> getSourceMasterAddresses() { + return sourceMasterAddresses; + } + + public void setSourceMasterAddresses(List<String> sourceMasterAddresses) { + this.sourceMasterAddresses = sourceMasterAddresses; + } + + public List<String> getSinkMasterAddresses() { + return sinkMasterAddresses; + } + + public void setSinkMasterAddresses(List<String> sinkMasterAddresses) { + this.sinkMasterAddresses = sinkMasterAddresses; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } +} diff --git a/java/settings.gradle b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java similarity index 50% copy from java/settings.gradle copy to java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java index ebe587ab8..6ad63ae09 100644 --- a/java/settings.gradle +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java @@ -15,18 +15,29 @@ // specific language governing permissions and limitations // under the License. -// This file contains the configuration of the project hierarchy. -// Mainly we just define what subprojects are in the build. +package org.apache.kudu.replication; -rootProject.name = "kudu-parent" -include "kudu-backup" -include "kudu-backup-common" -include "kudu-backup-tools" -include "kudu-client" -include "kudu-hive" -include "kudu-jepsen" -include "kudu-proto" -include "kudu-spark" -include "kudu-spark-tools" -include "kudu-subprocess" -include "kudu-test-utils" +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +class ReplicationJobExecutor { + ReplicationJobConfig config; + + ReplicationJobExecutor(ReplicationJobConfig config) { + this.config = config; + } + + public void runJob() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<String> kuduSourceStream = env.addSource( + new KuduSource(config.getSourceMasterAddresses(), config.getTableName())); + + kuduSourceStream.print(); + + try { + env.execute("Kudu Source Job"); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } +} diff --git a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java new file mode 100644 index 000000000..8ee06510a --- /dev/null +++ b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java @@ -0,0 +1,54 @@ +package org.apache.kudu.replication; + +import org.apache.kudu.Schema; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.test.KuduTestHarness; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.kudu.test.ClientTestUtil.getBasicSchema; +import static org.junit.Assert.assertTrue; + +public class TestReplicationHarness { + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHarness.class); + private static final String TABLE_NAME = "replication_test_table"; + + @Rule + public final KuduTestHarness sourceHarness = new KuduTestHarness(); + @Rule + public final KuduTestHarness sinkHarness = new KuduTestHarness(); + + private KuduClient sourceClient; + private KuduClient sinkClient; + + @Before + public void setup() { + CreateTableOptions builder = new CreateTableOptions().setNumReplicas(3); + Schema basicSchema = getBasicSchema(); + + this.sourceClient = sourceHarness.getClient(); + this.sinkClient = sinkHarness.getClient(); + + } + + @Test + public void BenchmarkReplicationScenario() { + ReplicationJobConfig config = new ReplicationJobConfig(); + + config.setSourceMasterAddresses(Arrays.asList(sourceHarness.getMasterAddressesAsString().split(","))); + config.setSinkMasterAddresses(Arrays.asList(sinkHarness.getMasterAddressesAsString().split(","))); + config.setTableName(TABLE_NAME); + + ReplicationJobExecutor executor = new ReplicationJobExecutor(config); + executor.runJob(); + + assertTrue(true); + } + +} diff --git a/java/kudu-replication/src/test/resources/log4j2.properties b/java/kudu-replication/src/test/resources/log4j2.properties new file mode 100644 index 000000000..e85deec99 --- /dev/null +++ b/java/kudu-replication/src/test/resources/log4j2.properties @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +status = error +name = PropertiesConfig +appenders = console + +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n + +rootLogger.level = info +rootLogger.appenderRefs = stdout +rootLogger.appenderRef.stdout.ref = STDOUT + +logger.kudu.name = org.apache.kudu +logger.kudu.level = debug \ No newline at end of file diff --git a/java/settings.gradle b/java/settings.gradle index ebe587ab8..7b62d7b1d 100644 --- a/java/settings.gradle +++ b/java/settings.gradle @@ -26,6 +26,7 @@ include "kudu-client" include "kudu-hive" include "kudu-jepsen" include "kudu-proto" +include "kudu-replication" include "kudu-spark" include "kudu-spark-tools" include "kudu-subprocess"
