This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6508b11d7c [MINOR] Performance improvement of flink ITs with reused 
miniCluster (#7151)
6508b11d7c is described below

commit 6508b11d7c1c1e4cb22aac86f9977cd951a91c9b
Author: Alexander Trushev <[email protected]>
AuthorDate: Thu Nov 10 15:38:43 2022 +0700

    [MINOR] Performance improvement of flink ITs with reused miniCluster (#7151)
    
    * implement MiniCluster extension compatible with junit5
---
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |  3 +
 .../sink/cluster/ITTestHoodieFlinkClustering.java  |  3 +
 .../sink/compact/ITTestHoodieFlinkCompactor.java   |  6 +-
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  6 +-
 .../org/apache/hudi/utils/FlinkMiniCluster.java    | 82 ++++++++++++++++++++++
 5 files changed, 97 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 193c0abcd8..6ab4b1b6e0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -29,6 +29,7 @@ import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.HoodiePipeline;
 import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestUtils;
@@ -54,6 +55,7 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.TestLogger;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -71,6 +73,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * Integration test for Flink Hoodie stream sink.
  */
+@ExtendWith(FlinkMiniCluster.class)
 public class ITTestDataStreamWrite extends TestLogger {
 
   private static final Map<String, List<String>> EXPECTED = new HashMap<>();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index f50f5748be..4c0fe82e44 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -39,6 +39,7 @@ import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestSQL;
@@ -56,6 +57,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
@@ -69,6 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /**
  * IT cases for {@link HoodieFlinkClusteringJob}.
  */
+@ExtendWith(FlinkMiniCluster.class)
 public class ITTestHoodieFlinkClustering {
 
   private static final Map<String, String> EXPECTED = new HashMap<>();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index 05d9d19854..ee9285d60a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -30,6 +30,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestSQL;
@@ -43,6 +44,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -63,6 +65,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /**
  * IT cases for {@link org.apache.hudi.common.model.HoodieRecord}.
  */
+@ExtendWith(FlinkMiniCluster.class)
 public class ITTestHoodieFlinkCompactor {
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class);
@@ -155,7 +158,7 @@ public class ITTestHoodieFlinkCompactor {
         .transform("compact_task",
             TypeInformation.of(CompactionCommitEvent.class),
             new ProcessOperator<>(new CompactFunction(conf)))
-        .setParallelism(compactionPlan.getOperations().size())
+        .setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM)
         .addSink(new CompactionCommitSink(conf))
         .name("clean_commits")
         .uid("uid_clean_commits")
@@ -195,6 +198,7 @@ public class ITTestHoodieFlinkCompactor {
     cfg.schedule = true;
     Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
     conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+    conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(), 
FlinkMiniCluster.DEFAULT_PARALLELISM);
 
     HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new 
HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
     asyncCompactionService.start(null);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index f7d2899a5b..865a717e08 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -27,6 +27,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
 import org.apache.hudi.table.catalog.HoodieHiveCatalog;
 import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestSQL;
@@ -44,11 +45,11 @@ import 
org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -77,7 +78,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /**
  * IT cases for Hoodie table source and sink.
  */
-public class ITTestHoodieDataSource extends AbstractTestBase {
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestHoodieDataSource {
   private TableEnvironment streamTableEnv;
   private TableEnvironment batchTableEnv;
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
new file mode 100644
index 0000000000..96d07cd656
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for tests that run multiple tests and want to reuse the same Flink 
cluster.
+ * Unlike {@link AbstractTestBase}, this class is designed to run with JUnit 5.
+ */
+public class FlinkMiniCluster implements BeforeAllCallback, AfterAllCallback, 
AfterEachCallback {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMiniCluster.class);
+
+  public static final int DEFAULT_PARALLELISM = 4;
+
+  private static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+              .build());
+
+  @Override
+  public void beforeAll(ExtensionContext context) throws Exception {
+    MINI_CLUSTER_RESOURCE.before();
+  }
+
+  @Override
+  public void afterAll(ExtensionContext context) {
+    MINI_CLUSTER_RESOURCE.after();
+  }
+
+  @Override
+  public void afterEach(ExtensionContext context) throws Exception {
+    cleanupRunningJobs();
+  }
+
+  private void cleanupRunningJobs() throws Exception {
+    if (!MINI_CLUSTER_RESOURCE.getMiniCluster().isRunning()) {
+      // do nothing if the MiniCluster is not running
+      LOG.warn("Mini cluster is not running after the test!");
+      return;
+    }
+
+    for (JobStatusMessage path : 
MINI_CLUSTER_RESOURCE.getClusterClient().listJobs().get()) {
+      if (!path.getJobState().isTerminalState()) {
+        try {
+          
MINI_CLUSTER_RESOURCE.getClusterClient().cancel(path.getJobId()).get();
+        } catch (Exception ignored) {
+          // ignore exceptions when cancelling dangling jobs
+        }
+      }
+    }
+  }
+}

Reply via email to