This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6eab828868 [HUDI-5295] One meta sync failure should not prevent other
meta sync from occurring (#7367)
6eab828868 is described below
commit 6eab82886839de8a597880c72883ea379a24433f
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Dec 7 22:23:34 2022 -0500
[HUDI-5295] One meta sync failure should not prevent other meta sync from
occurring (#7367)
Co-authored-by: Jonathan Vexler <=>
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 15 ++-
.../hudi/sync/common/util/SyncUtilHelpers.java | 14 +++
.../hudi/utilities/deltastreamer/DeltaSync.java | 18 ++-
.../deltastreamer/multisync/MockSyncTool1.java | 39 ++++++
.../deltastreamer/multisync/MockSyncTool2.java | 39 ++++++
.../multisync/MockSyncToolException1.java | 37 ++++++
.../multisync/MockSyncToolException2.java | 37 ++++++
.../multisync/TestMultipleMetaSync.java | 135 +++++++++++++++++++++
8 files changed, 328 insertions(+), 6 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 814d7a3d59..97c2c805cc 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -35,7 +35,6 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
-import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{CommitUtils, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME, KEYGEN_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
@@ -66,6 +65,7 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters.setAsJavaSetConverter
import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
import scala.util.matching.Regex
object HoodieSparkSqlWriter {
@@ -842,9 +842,20 @@ object HoodieSparkSqlWriter {
properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key,
SPARK_VERSION)
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key,
hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
+ //Collect exceptions in list because we want all sync to run. Then we
can throw
+ val metaSyncExceptions = new ListBuffer[HoodieException]()
syncClientToolClassSet.foreach(impl => {
- SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf,
fs, basePath.toString, baseFileFormat)
+ try {
+ SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf,
fs, basePath.toString, baseFileFormat)
+ } catch {
+ case e: HoodieException =>
+ log.info("SyncTool class " + impl.trim + " failed with exception",
e)
+ metaSyncExceptions.add(e)
+ }
})
+ if (metaSyncExceptions.nonEmpty) {
+ throw SyncUtilHelpers.getExceptionFromList(metaSyncExceptions)
+ }
}
// Since Hive tables are now synced as Spark data source tables which are
cached after Spark SQL queries
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index e7d61f96cf..26f90facdd 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.util.Collection;
import java.util.Properties;
/**
@@ -104,4 +105,17 @@ public class SyncUtilHelpers {
+ ": no valid constructor found.");
}
}
+
+ public static HoodieException
getExceptionFromList(Collection<HoodieException> exceptions) {
+ if (exceptions.size() == 1) {
+ return exceptions.stream().findFirst().get();
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("Multiple exceptions during meta sync:\n");
+ exceptions.forEach(e -> {
+ sb.append(e.getMessage());
+ sb.append("\n");
+ });
+ return new HoodieException(sb.toString());
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 03169acc27..9821f393da 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -744,11 +744,21 @@ public class DeltaSync implements Serializable, Closeable
{
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
}
+ //Collect exceptions in list because we want all sync to run. Then we
can throw
+ List<HoodieException> metaSyncExceptions = new ArrayList<>();
for (String impl : syncClientToolClasses) {
- Timer.Context syncContext = metrics.getMetaSyncTimerContext();
- SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, conf, fs,
cfg.targetBasePath, cfg.baseFileFormat);
- long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
-
metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl),
metaSyncTimeMs);
+ try {
+ Timer.Context syncContext = metrics.getMetaSyncTimerContext();
+ SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, conf, fs,
cfg.targetBasePath, cfg.baseFileFormat);
+ long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
+
metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl),
metaSyncTimeMs);
+ } catch (HoodieException e) {
+ LOG.info("SyncTool class " + impl.trim() + " failed with exception",
e);
+ metaSyncExceptions.add(e);
+ }
+ }
+ if (!metaSyncExceptions.isEmpty()) {
+ throw SyncUtilHelpers.getExceptionFromList(metaSyncExceptions);
}
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool1.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool1.java
new file mode 100644
index 0000000000..9ad2bd601c
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool1.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer.multisync;
+
+import org.apache.hudi.sync.common.HoodieSyncTool;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+public class MockSyncTool1 extends HoodieSyncTool {
+
+ public static boolean syncSuccess;
+
+ public MockSyncTool1(Properties props, Configuration hadoopConf) {
+ super(props, hadoopConf);
+ }
+
+ @Override
+ public void syncHoodieTable() {
+ syncSuccess = true;
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool2.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool2.java
new file mode 100644
index 0000000000..68d85823c4
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncTool2.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer.multisync;
+
+import org.apache.hudi.sync.common.HoodieSyncTool;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+public class MockSyncTool2 extends HoodieSyncTool {
+
+ public static boolean syncSuccess;
+
+ public MockSyncTool2(Properties props, Configuration hadoopConf) {
+ super(props, hadoopConf);
+ }
+
+ @Override
+ public void syncHoodieTable() {
+ syncSuccess = true;
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException1.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException1.java
new file mode 100644
index 0000000000..1f8cd1a06f
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException1.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer.multisync;
+
+import org.apache.hudi.sync.common.HoodieSyncTool;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+public class MockSyncToolException1 extends HoodieSyncTool {
+
+ public MockSyncToolException1(Properties props, Configuration hadoopConf) {
+ super(props, hadoopConf);
+ }
+
+ @Override
+ public void syncHoodieTable() {
+ throw new RuntimeException();
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException2.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException2.java
new file mode 100644
index 0000000000..6dcda81965
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/MockSyncToolException2.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer.multisync;
+
+import org.apache.hudi.sync.common.HoodieSyncTool;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+public class MockSyncToolException2 extends HoodieSyncTool {
+
+ public MockSyncToolException2(Properties props, Configuration hadoopConf) {
+ super(props, hadoopConf);
+ }
+
+ @Override
+ public void syncHoodieTable() {
+ throw new RuntimeException();
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
new file mode 100644
index 0000000000..3db11be49d
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer.multisync;
+
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.sources.TestDataSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestMultipleMetaSync extends HoodieDeltaStreamerTestBase {
+
+ @AfterAll
+ public static void cleanupClass() {
+ UtilitiesTestBase.cleanupClass();
+ if (testUtils != null) {
+ testUtils.teardown();
+ }
+ }
+
+ @Override
+ @BeforeEach
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @Override
+ @AfterEach
+ public void teardown() throws Exception {
+ super.teardown();
+ }
+
+ @Test
+ void testMultipleMetaStore() throws Exception {
+ String tableBasePath = basePath + "/test_multiple_metastore";
+ MockSyncTool1.syncSuccess = false;
+ MockSyncTool2.syncSuccess = false;
+ // Initial bulk insert to ingest to first hudi table
+ HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath,
getSyncNames("MockSyncTool1", "MockSyncTool2"));
+ new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ assertTrue(MockSyncTool1.syncSuccess);
+ assertTrue(MockSyncTool2.syncSuccess);
+ }
+
+ @ParameterizedTest
+ @MethodSource("withOneException")
+ void testeWithException(String syncClassNames) {
+ String tableBasePath = basePath + "/test_multiple_metastore_exception";
+ MockSyncTool1.syncSuccess = false;
+ MockSyncTool2.syncSuccess = false;
+ HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, syncClassNames);
+ Exception e = assertThrows(HoodieException.class, () -> new
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
+
assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName()));
+ assertTrue(MockSyncTool1.syncSuccess);
+ assertTrue(MockSyncTool2.syncSuccess);
+ }
+
+ @Test
+ void testMultipleExceptions() {
+ String tableBasePath = basePath +
"/test_multiple_metastore_multiple_exception";
+ MockSyncTool1.syncSuccess = false;
+ MockSyncTool2.syncSuccess = false;
+ HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath,
getSyncNames("MockSyncTool1", "MockSyncTool2", "MockSyncToolException1",
"MockSyncToolException2"));
+ Exception e = assertThrows(HoodieException.class, () -> new
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
+
assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName()));
+
assertTrue(e.getMessage().contains(MockSyncToolException2.class.getName()));
+ assertTrue(MockSyncTool1.syncSuccess);
+ assertTrue(MockSyncTool2.syncSuccess);
+ }
+
+ HoodieDeltaStreamer.Config getConfig(String basePath, String syncClassNames)
{
+ HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
+ cfg.targetBasePath = basePath;
+ cfg.targetTableName = "hoodie_trips";
+ cfg.tableType = "COPY_ON_WRITE";
+ cfg.sourceClassName = TestDataSource.class.getName();
+ cfg.transformerClassNames =
Collections.singletonList(SqlQueryBasedTransformer.class.getName());
+ cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
+ cfg.syncClientToolClassNames = syncClassNames;
+ cfg.operation = WriteOperationType.BULK_INSERT;
+ cfg.enableHiveSync = true;
+ cfg.sourceOrderingField = "timestamp";
+ cfg.propsFilePath = UtilitiesTestBase.basePath + "/test-source.properties";
+
cfg.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day");
+ cfg.sourceLimit = 1000;
+ return cfg;
+ }
+
+ private static String getSyncNames(String... syncs) {
+ return Arrays.stream(syncs).map(s ->
"org.apache.hudi.utilities.deltastreamer.multisync." +
s).collect(Collectors.joining(","));
+ }
+
+ private static Stream<Arguments> withOneException() {
+ return Stream.of(
+ Arguments.of(getSyncNames("MockSyncTool1", "MockSyncTool2",
"MockSyncToolException1")),
+ Arguments.of(getSyncNames("MockSyncTool1", "MockSyncToolException1",
"MockSyncTool2")),
+ Arguments.of(getSyncNames("MockSyncToolException1", "MockSyncTool1",
"MockSyncTool2"))
+ );
+ }
+}