This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 12a0897eac9ca7059da44c9cb6e625030493087c
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Dec 7 22:23:34 2022 -0500

    [HUDI-5295] One meta sync failure should not prevent other meta sync from 
occurring (#7367)
    
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  14 ++-
 .../hudi/sync/common/util/SyncUtilHelpers.java     |  14 +++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  18 ++-
 .../deltastreamer/multisync/MockSyncTool1.java     |  39 ++++++
 .../deltastreamer/multisync/MockSyncTool2.java     |  39 ++++++
 .../multisync/MockSyncToolException1.java          |  37 ++++++
 .../multisync/MockSyncToolException2.java          |  37 ++++++
 .../multisync/TestMultipleMetaSync.java            | 135 +++++++++++++++++++++
 8 files changed, 328 insertions(+), 5 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 0122576a679..18309943df1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -61,6 +61,7 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
 import scala.util.matching.Regex
 
 object HoodieSparkSqlWriter {
@@ -693,9 +694,20 @@ object HoodieSparkSqlWriter {
       properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key, 
SPARK_VERSION)
       
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key, 
hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
 
+      //Collect exceptions in list because we want all sync to run. Then we 
can throw
+      val metaSyncExceptions = new ListBuffer[HoodieException]()
       syncClientToolClassSet.foreach(impl => {
-        SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf, 
fs, basePath.toString, baseFileFormat)
+        try {
+          SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf, 
fs, basePath.toString, baseFileFormat)
+        } catch {
+          case e: HoodieException =>
+            log.info("SyncTool class " + impl.trim + " failed with exception", 
e)
+            metaSyncExceptions.add(e)
+        }
       })
+      if (metaSyncExceptions.nonEmpty) {
+        throw SyncUtilHelpers.getExceptionFromList(metaSyncExceptions)
+      }
     }
 
     // Since Hive tables are now synced as Spark data source tables which are 
cached after Spark SQL queries
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index e7d61f96cff..26f90facdd3 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.util.Collection;
 import java.util.Properties;
 
 /**
@@ -104,4 +105,17 @@ public class SyncUtilHelpers {
           + ": no valid constructor found.");
     }
   }
+
+  public static HoodieException 
getExceptionFromList(Collection<HoodieException> exceptions) {
+    if (exceptions.size() == 1) {
+      return exceptions.stream().findFirst().get();
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append("Multiple exceptions during meta sync:\n");
+    exceptions.forEach(e -> {
+      sb.append(e.getMessage());
+      sb.append("\n");
+    });
+    return new HoodieException(sb.toString());
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 5b17a9f85be..9d8a5ef991b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -743,11 +743,21 @@ public class DeltaSync implements Serializable {
             
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
       }
 
+      //Collect exceptions in list because we want all sync to run. Then we 
can throw
+      List<HoodieException> metaSyncExceptions = new ArrayList<>();
       for (String impl : syncClientToolClasses) {
-        Timer.Context syncContext = metrics.getMetaSyncTimerContext();
-        SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, conf, fs, 
cfg.targetBasePath, cfg.baseFileFormat);
-        long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
-        
metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl), 
metaSyncTimeMs);
+        try {
+          Timer.Context syncContext = metrics.getMetaSyncTimerContext();
+          SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, conf, fs, 
cfg.targetBasePath, cfg.baseFileFormat);
+          long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
+          
metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl), 
metaSyncTimeMs);
+        } catch (HoodieException e) {
+          LOG.info("SyncTool class " + impl.trim() + " failed with exception", 
e);
+          metaSyncExceptions.add(e);
+        }
+      }
+      if (!metaSyncExceptions.isEmpty()) {
+        throw SyncUtilHelpers.getExceptionFromList(metaSyncExceptions);
       }
     }
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool1.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool1.java
new file mode 100644
index 00000000000..9ad2bd601ca
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool1.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer.multisync;
+
+import org.apache.hudi.sync.common.HoodieSyncTool;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+public class MockSyncTool1 extends HoodieSyncTool {
+
+  public static boolean syncSuccess;
+
+  public MockSyncTool1(Properties props, Configuration hadoopConf) {
+    super(props, hadoopConf);
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    syncSuccess = true;
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool2.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool2.java
new file mode 100644
index 00000000000..68d85823c42
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool2.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer.multisync;
+
+import org.apache.hudi.sync.common.HoodieSyncTool;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+public class MockSyncTool2 extends HoodieSyncTool {
+
+  public static boolean syncSuccess;
+
+  public MockSyncTool2(Properties props, Configuration hadoopConf) {
+    super(props, hadoopConf);
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    syncSuccess = true;
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException1.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException1.java
new file mode 100644
index 00000000000..1f8cd1a06fb
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException1.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer.multisync;
+
+import org.apache.hudi.sync.common.HoodieSyncTool;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+public class MockSyncToolException1 extends HoodieSyncTool {
+
+  public MockSyncToolException1(Properties props, Configuration hadoopConf) {
+    super(props, hadoopConf);
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    throw new RuntimeException();
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException2.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException2.java
new file mode 100644
index 00000000000..6dcda819652
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException2.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer.multisync;
+
+import org.apache.hudi.sync.common.HoodieSyncTool;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+public class MockSyncToolException2 extends HoodieSyncTool {
+
+  public MockSyncToolException2(Properties props, Configuration hadoopConf) {
+    super(props, hadoopConf);
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    throw new RuntimeException();
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
new file mode 100644
index 00000000000..3db11be49da
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer.multisync;
+
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.sources.TestDataSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestMultipleMetaSync extends HoodieDeltaStreamerTestBase {
+
+  @AfterAll
+  public static void cleanupClass() {
+    UtilitiesTestBase.cleanupClass();
+    if (testUtils != null) {
+      testUtils.teardown();
+    }
+  }
+
+  @Override
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+  }
+
+  @Override
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+  }
+
+  @Test
+  void testMultipleMetaStore() throws Exception {
+    String tableBasePath = basePath + "/test_multiple_metastore";
+    MockSyncTool1.syncSuccess = false;
+    MockSyncTool2.syncSuccess = false;
+    // Initial bulk insert to ingest to first hudi table
+    HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, 
getSyncNames("MockSyncTool1", "MockSyncTool2"));
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+    assertTrue(MockSyncTool1.syncSuccess);
+    assertTrue(MockSyncTool2.syncSuccess);
+  }
+
+  @ParameterizedTest
+  @MethodSource("withOneException")
+  void testeWithException(String syncClassNames) {
+    String tableBasePath = basePath + "/test_multiple_metastore_exception";
+    MockSyncTool1.syncSuccess = false;
+    MockSyncTool2.syncSuccess = false;
+    HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, syncClassNames);
+    Exception e = assertThrows(HoodieException.class, () -> new 
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
+    
assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName()));
+    assertTrue(MockSyncTool1.syncSuccess);
+    assertTrue(MockSyncTool2.syncSuccess);
+  }
+
+  @Test
+  void testMultipleExceptions() {
+    String tableBasePath = basePath + 
"/test_multiple_metastore_multiple_exception";
+    MockSyncTool1.syncSuccess = false;
+    MockSyncTool2.syncSuccess = false;
+    HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, 
getSyncNames("MockSyncTool1", "MockSyncTool2", "MockSyncToolException1", 
"MockSyncToolException2"));
+    Exception e = assertThrows(HoodieException.class, () -> new 
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
+    
assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName()));
+    
assertTrue(e.getMessage().contains(MockSyncToolException2.class.getName()));
+    assertTrue(MockSyncTool1.syncSuccess);
+    assertTrue(MockSyncTool2.syncSuccess);
+  }
+
+  HoodieDeltaStreamer.Config getConfig(String basePath, String syncClassNames) 
{
+    HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
+    cfg.targetBasePath = basePath;
+    cfg.targetTableName = "hoodie_trips";
+    cfg.tableType = "COPY_ON_WRITE";
+    cfg.sourceClassName = TestDataSource.class.getName();
+    cfg.transformerClassNames = 
Collections.singletonList(SqlQueryBasedTransformer.class.getName());
+    cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
+    cfg.syncClientToolClassNames = syncClassNames;
+    cfg.operation = WriteOperationType.BULK_INSERT;
+    cfg.enableHiveSync = true;
+    cfg.sourceOrderingField = "timestamp";
+    cfg.propsFilePath = UtilitiesTestBase.basePath + "/test-source.properties";
+    
cfg.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day");
+    cfg.sourceLimit =  1000;
+    return cfg;
+  }
+
+  private static String getSyncNames(String... syncs) {
+    return Arrays.stream(syncs).map(s -> 
"org.apache.hudi.utilities.deltastreamer.multisync." + 
s).collect(Collectors.joining(","));
+  }
+
+  private static Stream<Arguments> withOneException()  {
+    return Stream.of(
+        Arguments.of(getSyncNames("MockSyncTool1", "MockSyncTool2", 
"MockSyncToolException1")),
+        Arguments.of(getSyncNames("MockSyncTool1", "MockSyncToolException1", 
"MockSyncTool2")),
+        Arguments.of(getSyncNames("MockSyncToolException1", "MockSyncTool1", 
"MockSyncTool2"))
+    );
+  }
+}

Reply via email to