This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 12a0897eac9ca7059da44c9cb6e625030493087c Author: Jon Vexler <[email protected]> AuthorDate: Wed Dec 7 22:23:34 2022 -0500 [HUDI-5295] One meta sync failure should not prevent other meta sync from occurring (#7367) Co-authored-by: Jonathan Vexler <=> --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 14 ++- .../hudi/sync/common/util/SyncUtilHelpers.java | 14 +++ .../hudi/utilities/deltastreamer/DeltaSync.java | 18 ++- .../deltastreamer/multisync/MockSyncTool1.java | 39 ++++++ .../deltastreamer/multisync/MockSyncTool2.java | 39 ++++++ .../multisync/MockSyncToolException1.java | 37 ++++++ .../multisync/MockSyncToolException2.java | 37 ++++++ .../multisync/TestMultipleMetaSync.java | 135 +++++++++++++++++++++ 8 files changed, 328 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 0122576a679..18309943df1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -61,6 +61,7 @@ import org.apache.spark.{SPARK_VERSION, SparkContext} import scala.collection.JavaConversions._ import scala.collection.mutable +import scala.collection.mutable.ListBuffer import scala.util.matching.Regex object HoodieSparkSqlWriter { @@ -693,9 +694,20 @@ object HoodieSparkSqlWriter { properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key, SPARK_VERSION) properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key, hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE)) + //Collect exceptions in list because we want all sync to run. Then we can throw + val metaSyncExceptions = new ListBuffer[HoodieException]() syncClientToolClassSet.foreach(impl => { - SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf, fs, basePath.toString, baseFileFormat) + try { + SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf, fs, basePath.toString, baseFileFormat) + } catch { + case e: HoodieException => + log.info("SyncTool class " + impl.trim + " failed with exception", e) + metaSyncExceptions.add(e) + } }) + if (metaSyncExceptions.nonEmpty) { + throw SyncUtilHelpers.getExceptionFromList(metaSyncExceptions) + } } // Since Hive tables are now synced as Spark data source tables which are cached after Spark SQL queries diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java index e7d61f96cff..26f90facdd3 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.util.Collection; import java.util.Properties; /** @@ -104,4 +105,17 @@ public class SyncUtilHelpers { + ": no valid constructor found."); } } + + public static HoodieException getExceptionFromList(Collection<HoodieException> exceptions) { + if (exceptions.size() == 1) { + return exceptions.stream().findFirst().get(); + } + StringBuilder sb = new StringBuilder(); + sb.append("Multiple exceptions during meta sync:\n"); + exceptions.forEach(e -> { + sb.append(e.getMessage()); + sb.append("\n"); + }); + return new HoodieException(sb.toString()); + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 5b17a9f85be..9d8a5ef991b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -743,11 +743,21 @@ public class DeltaSync implements Serializable { props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key()))); } + //Collect exceptions in list because we want all sync to run. Then we can throw + List<HoodieException> metaSyncExceptions = new ArrayList<>(); for (String impl : syncClientToolClasses) { - Timer.Context syncContext = metrics.getMetaSyncTimerContext(); - SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, conf, fs, cfg.targetBasePath, cfg.baseFileFormat); - long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0; - metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl), metaSyncTimeMs); + try { + Timer.Context syncContext = metrics.getMetaSyncTimerContext(); + SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, conf, fs, cfg.targetBasePath, cfg.baseFileFormat); + long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0; + metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl), metaSyncTimeMs); + } catch (HoodieException e) { + LOG.info("SyncTool class " + impl.trim() + " failed with exception", e); + metaSyncExceptions.add(e); + } + } + if (!metaSyncExceptions.isEmpty()) { + throw SyncUtilHelpers.getExceptionFromList(metaSyncExceptions); } } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool1.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool1.java new file mode 100644 index 00000000000..9ad2bd601ca --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool1.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.deltastreamer.multisync; + +import org.apache.hudi.sync.common.HoodieSyncTool; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Properties; + +public class MockSyncTool1 extends HoodieSyncTool { + + public static boolean syncSuccess; + + public MockSyncTool1(Properties props, Configuration hadoopConf) { + super(props, hadoopConf); + } + + @Override + public void syncHoodieTable() { + syncSuccess = true; + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool2.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool2.java new file mode 100644 index 00000000000..68d85823c42 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool2.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.deltastreamer.multisync; + +import org.apache.hudi.sync.common.HoodieSyncTool; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Properties; + +public class MockSyncTool2 extends HoodieSyncTool { + + public static boolean syncSuccess; + + public MockSyncTool2(Properties props, Configuration hadoopConf) { + super(props, hadoopConf); + } + + @Override + public void syncHoodieTable() { + syncSuccess = true; + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException1.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException1.java new file mode 100644 index 00000000000..1f8cd1a06fb --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException1.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.deltastreamer.multisync; + +import org.apache.hudi.sync.common.HoodieSyncTool; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Properties; + +public class MockSyncToolException1 extends HoodieSyncTool { + + public MockSyncToolException1(Properties props, Configuration hadoopConf) { + super(props, hadoopConf); + } + + @Override + public void syncHoodieTable() { + throw new RuntimeException(); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException2.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException2.java new file mode 100644 index 00000000000..6dcda819652 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException2.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.deltastreamer.multisync; + +import org.apache.hudi.sync.common.HoodieSyncTool; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Properties; + +public class MockSyncToolException2 extends HoodieSyncTool { + + public MockSyncToolException2(Properties props, Configuration hadoopConf) { + super(props, hadoopConf); + } + + @Override + public void syncHoodieTable() { + throw new RuntimeException(); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java new file mode 100644 index 00000000000..3db11be49da --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.deltastreamer.multisync; + +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; +import org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.sources.TestDataSource; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; +import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestMultipleMetaSync extends HoodieDeltaStreamerTestBase { + + @AfterAll + public static void cleanupClass() { + UtilitiesTestBase.cleanupClass(); + if (testUtils != null) { + testUtils.teardown(); + } + } + + @Override + @BeforeEach + public void setup() throws Exception { + super.setup(); + } + + @Override + @AfterEach + public void teardown() throws Exception { + super.teardown(); + } + + @Test + void testMultipleMetaStore() throws Exception { + String tableBasePath = basePath + "/test_multiple_metastore"; + MockSyncTool1.syncSuccess = false; + MockSyncTool2.syncSuccess = false; + // Initial bulk insert to ingest to first hudi table + HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, getSyncNames("MockSyncTool1", "MockSyncTool2")); + new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync(); + assertTrue(MockSyncTool1.syncSuccess); + assertTrue(MockSyncTool2.syncSuccess); + } + + @ParameterizedTest + @MethodSource("withOneException") + void testeWithException(String syncClassNames) { + String tableBasePath = basePath + "/test_multiple_metastore_exception"; + MockSyncTool1.syncSuccess = false; + MockSyncTool2.syncSuccess = false; + HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, syncClassNames); + Exception e = assertThrows(HoodieException.class, () -> new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync()); + assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName())); + assertTrue(MockSyncTool1.syncSuccess); + assertTrue(MockSyncTool2.syncSuccess); + } + + @Test + void testMultipleExceptions() { + String tableBasePath = basePath + "/test_multiple_metastore_multiple_exception"; + MockSyncTool1.syncSuccess = false; + MockSyncTool2.syncSuccess = false; + HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, getSyncNames("MockSyncTool1", "MockSyncTool2", "MockSyncToolException1", "MockSyncToolException2")); + Exception e = assertThrows(HoodieException.class, () -> new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync()); + assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName())); + assertTrue(e.getMessage().contains(MockSyncToolException2.class.getName())); + assertTrue(MockSyncTool1.syncSuccess); + assertTrue(MockSyncTool2.syncSuccess); + } + + HoodieDeltaStreamer.Config getConfig(String basePath, String syncClassNames) { + HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); + cfg.targetBasePath = basePath; + cfg.targetTableName = "hoodie_trips"; + cfg.tableType = "COPY_ON_WRITE"; + cfg.sourceClassName = TestDataSource.class.getName(); + cfg.transformerClassNames = Collections.singletonList(SqlQueryBasedTransformer.class.getName()); + cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); + cfg.syncClientToolClassNames = syncClassNames; + cfg.operation = WriteOperationType.BULK_INSERT; + cfg.enableHiveSync = true; + cfg.sourceOrderingField = "timestamp"; + cfg.propsFilePath = UtilitiesTestBase.basePath + "/test-source.properties"; + cfg.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day"); + cfg.sourceLimit = 1000; + return cfg; + } + + private static String getSyncNames(String... syncs) { + return Arrays.stream(syncs).map(s -> "org.apache.hudi.utilities.deltastreamer.multisync." + s).collect(Collectors.joining(",")); + } + + private static Stream<Arguments> withOneException() { + return Stream.of( + Arguments.of(getSyncNames("MockSyncTool1", "MockSyncTool2", "MockSyncToolException1")), + Arguments.of(getSyncNames("MockSyncTool1", "MockSyncToolException1", "MockSyncTool2")), + Arguments.of(getSyncNames("MockSyncToolException1", "MockSyncTool1", "MockSyncTool2")) + ); + } +}
