This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6508b11d7c [MINOR] Performance improvement of flink ITs with reused
miniCluster (#7151)
6508b11d7c is described below
commit 6508b11d7c1c1e4cb22aac86f9977cd951a91c9b
Author: Alexander Trushev <[email protected]>
AuthorDate: Thu Nov 10 15:38:43 2022 +0700
[MINOR] Performance improvement of flink ITs with reused miniCluster (#7151)
* implement MiniCluster extension compatible with junit5
---
.../apache/hudi/sink/ITTestDataStreamWrite.java | 3 +
.../sink/cluster/ITTestHoodieFlinkClustering.java | 3 +
.../sink/compact/ITTestHoodieFlinkCompactor.java | 6 +-
.../apache/hudi/table/ITTestHoodieDataSource.java | 6 +-
.../org/apache/hudi/utils/FlinkMiniCluster.java | 82 ++++++++++++++++++++++
5 files changed, 97 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 193c0abcd8..6ab4b1b6e0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -29,6 +29,7 @@ import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.HoodiePipeline;
import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
@@ -54,6 +55,7 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -71,6 +73,7 @@ import java.util.concurrent.TimeUnit;
/**
* Integration test for Flink Hoodie stream sink.
*/
+@ExtendWith(FlinkMiniCluster.class)
public class ITTestDataStreamWrite extends TestLogger {
private static final Map<String, List<String>> EXPECTED = new HashMap<>();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index f50f5748be..4c0fe82e44 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -39,6 +39,7 @@ import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
@@ -56,6 +57,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
@@ -69,6 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* IT cases for {@link HoodieFlinkClusteringJob}.
*/
+@ExtendWith(FlinkMiniCluster.class)
public class ITTestHoodieFlinkClustering {
private static final Map<String, String> EXPECTED = new HashMap<>();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index 05d9d19854..ee9285d60a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -30,6 +30,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
@@ -43,6 +44,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -63,6 +65,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* IT cases for {@link org.apache.hudi.common.model.HoodieRecord}.
*/
+@ExtendWith(FlinkMiniCluster.class)
public class ITTestHoodieFlinkCompactor {
protected static final Logger LOG =
LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class);
@@ -155,7 +158,7 @@ public class ITTestHoodieFlinkCompactor {
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
- .setParallelism(compactionPlan.getOperations().size())
+ .setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM)
.addSink(new CompactionCommitSink(conf))
.name("clean_commits")
.uid("uid_clean_commits")
@@ -195,6 +198,7 @@ public class ITTestHoodieFlinkCompactor {
cfg.schedule = true;
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+ conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(),
FlinkMiniCluster.DEFAULT_PARALLELISM);
HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new
HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
asyncCompactionService.start(null);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index f7d2899a5b..865a717e08 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -27,6 +27,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
import org.apache.hudi.table.catalog.HoodieHiveCatalog;
import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
@@ -44,11 +45,11 @@ import
org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -77,7 +78,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* IT cases for Hoodie table source and sink.
*/
-public class ITTestHoodieDataSource extends AbstractTestBase {
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestHoodieDataSource {
private TableEnvironment streamTableEnv;
private TableEnvironment batchTableEnv;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
new file mode 100644
index 0000000000..96d07cd656
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for tests that run multiple tests and want to reuse the same Flink
cluster.
+ * Unlike {@link AbstractTestBase}, this class is designed to run with JUnit 5.
+ */
+public class FlinkMiniCluster implements BeforeAllCallback, AfterAllCallback,
AfterEachCallback {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkMiniCluster.class);
+
+ public static final int DEFAULT_PARALLELISM = 4;
+
+ private static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .build());
+
+ @Override
+ public void beforeAll(ExtensionContext context) throws Exception {
+ MINI_CLUSTER_RESOURCE.before();
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) {
+ MINI_CLUSTER_RESOURCE.after();
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ cleanupRunningJobs();
+ }
+
+ private void cleanupRunningJobs() throws Exception {
+ if (!MINI_CLUSTER_RESOURCE.getMiniCluster().isRunning()) {
+ // do nothing if the MiniCluster is not running
+ LOG.warn("Mini cluster is not running after the test!");
+ return;
+ }
+
+ for (JobStatusMessage path :
MINI_CLUSTER_RESOURCE.getClusterClient().listJobs().get()) {
+ if (!path.getJobState().isTerminalState()) {
+ try {
+
MINI_CLUSTER_RESOURCE.getClusterClient().cancel(path.getJobId()).get();
+ } catch (Exception ignored) {
+ // ignore exceptions when cancelling dangling jobs
+ }
+ }
+ }
+ }
+}