the-other-tim-brown commented on code in PR #17758:
URL: https://github.com/apache/hudi/pull/17758#discussion_r2655671087
##########
hudi-tests-common/pom.xml:
##########
@@ -244,10 +244,5 @@
<artifactId>testcontainers</artifactId>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>org.testcontainers</groupId>
Review Comment:
Moved this to be where it is required
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/KafkaTestUtils.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.testutils;
+
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.kafka.ConfluentKafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+public class KafkaTestUtils {
Review Comment:
Drop in replacement for the existing KafkaTestUtils but does not require
zookeeper
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java:
##########
@@ -704,31 +702,31 @@ static HoodieDeltaStreamer.Config
makeConfigForHudiIncrSrc(String srcBasePath, S
static void assertAtleastNCompactionCommits(int minExpected, String
tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage, tablePath);
HoodieTimeline timeline =
meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
Review Comment:
Fixed logging to avoid the toString calls while I was updating this file
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java:
##########
@@ -140,15 +140,12 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
protected static String topicName;
protected static String defaultSchemaProviderClassName =
FilebasedSchemaProvider.class.getName();
protected static int testNum = 1;
-
- Map<String, String> hudiOpts = new HashMap<>();
- public KafkaTestUtils testUtils;
+ protected static KafkaTestUtils testUtils;
Review Comment:
Tests are now only setting up kafka once to cut down on the overhead
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java:
##########
@@ -49,7 +49,7 @@ void testMultipleMetaStore() throws Exception {
MockSyncTool2.syncSuccess = false;
// Initial bulk insert to ingest to first hudi table
HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath,
getSyncNames("MockSyncTool1", "MockSyncTool2"));
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()));
Review Comment:
Lots of changes in this PR are using utils or other means to ensure the
streamer is shutdown
##########
azure-pipelines-20230430.yml:
##########
@@ -378,6 +378,7 @@ stages:
command: 'run'
arguments: >
-v $(Build.SourcesDirectory):/hudi
+ -v /var/run/docker.sock:/var/run/docker.sock
Review Comment:
This is required for running docker in docker
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java:
##########
@@ -460,8 +460,10 @@ private static String resetTarget(Config configuration,
String database, String
public void sync() {
for (TableExecutionContext context : tableExecutionContexts) {
try {
- new HoodieStreamer(context.getConfig(), jssc,
Option.ofNullable(context.getProperties())).sync();
+ HoodieStreamer streamer = new HoodieStreamer(context.getConfig(),
jssc, Option.ofNullable(context.getProperties()));
+ streamer.sync();
successTables.add(Helpers.getTableWithDatabase(context));
+ streamer.shutdownGracefully();
Review Comment:
Ensure the instances are shutdown properly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]