Repository: incubator-gobblin Updated Branches: refs/heads/master b67fd71b1 -> 90be15f47
[GOBBLIN-222] Fix silent failure for loading incompatible state-store Fix silent failure for laoding imcompatible statestore Fix the test failure: The executor doesn't handle the exception properly and we add a logAndThrowFailures function in IteratorExecutors Rename the shim layer pakage but keep them temporarily; Added runtime Package name changing routine in gobblin resolveing conflicts Resolve merge conflicts Remove unecessary xml change bacause of intellij's renaming, fixed findbugsMain change the shim layer package name back for safety Closes #2073 from autumnust/deserialization Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/90be15f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/90be15f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/90be15f4 Branch: refs/heads/master Commit: 90be15f470b1427c935fe11c6595b9f8184536ec Parents: b67fd71 Author: Lei Sun <[email protected]> Authored: Tue Aug 29 16:46:18 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Aug 29 16:46:18 2017 -0700 ---------------------------------------------------------------------- conf/log4j-compaction.xml | 2 +- conf/log4j-mapreduce.xml | 4 +- .../extractor/CheckpointableWatermark.java | 2 +- .../apache/gobblin/metastore/FsStateStore.java | 13 +++--- .../local/gobblin-oozie-example-workflow.xml | 2 +- .../gobblin/runtime/FsDatasetStateStore.java | 22 +++++++-- .../runtime/FsDatasetStateStoreTest.java | 26 +++++++++++ .../util/executors/IteratorExecutor.java | 31 +++++++++++++ .../util/hadoop/GobblinSequenceFileReader.java | 49 ++++++++++++++++++++ 9 files changed, 137 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/conf/log4j-compaction.xml ---------------------------------------------------------------------- diff --git a/conf/log4j-compaction.xml b/conf/log4j-compaction.xml index 8b42725..cfce89c 100644 --- a/conf/log4j-compaction.xml +++ b/conf/log4j-compaction.xml @@ -36,7 +36,7 @@ </layout> </appender> - <logger name="gobblin.compaction" additivity="false"> + <logger name="org.apache.gobblin.compaction" additivity="false"> <level value="info" /> <appender-ref ref="console" /> </logger> http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/conf/log4j-mapreduce.xml ---------------------------------------------------------------------- diff --git a/conf/log4j-mapreduce.xml b/conf/log4j-mapreduce.xml index 5ab248c..826493a 100644 --- a/conf/log4j-mapreduce.xml +++ b/conf/log4j-mapreduce.xml @@ -42,12 +42,12 @@ </layout> </appender> - <logger name="gobblin.runtime" additivity="false"> + <logger name="org.apache.gobblin.runtime" additivity="false"> <level value="INFO"/> <appender-ref ref="FileRoll" /> </logger> - <logger name="gobblin.runtime.mapreduce.CliMRJobLauncher" additivity="false"> + <logger name="org.apache.gobblin.runtime.mapreduce.CliMRJobLauncher" additivity="false"> <level value="ERROR"/> <appender-ref ref="Console" /> </logger> http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/CheckpointableWatermark.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/CheckpointableWatermark.java b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/CheckpointableWatermark.java index d257e5c..7d2fbab 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/CheckpointableWatermark.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/CheckpointableWatermark.java @@ -38,4 +38,4 @@ public interface CheckpointableWatermark extends Watermark, Comparable<Checkpoin */ ComparableWatermark getWatermark(); -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java index 96c3e51..54dbdd7 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java @@ -19,13 +19,12 @@ package org.apache.gobblin.metastore; import static org.apache.gobblin.util.HadoopUtils.FS_SCHEMES_NON_ATOMIC; -import com.google.common.base.Predicate; -import org.apache.gobblin.util.HadoopUtils; import java.io.IOException; import java.net.URI; import java.util.Collection; import java.util.List; +import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -34,13 +33,15 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.util.WritableShimSerialization; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.io.Closer; +import com.google.common.base.Predicate; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.util.WritableShimSerialization; /** @@ -225,7 +226,7 @@ public class FsStateStore<T extends State> implements StateStore<T> { Closer closer = Closer.create(); try { @SuppressWarnings("deprecation") - SequenceFile.Reader reader = closer.register(new SequenceFile.Reader(this.fs, tablePath, this.conf)); + GobblinSequenceFileReader reader = closer.register(new GobblinSequenceFileReader(this.fs, tablePath, this.conf)); try { Text key = new Text(); T state = this.stateClass.newInstance(); @@ -260,7 +261,7 @@ public class FsStateStore<T extends State> implements StateStore<T> { Closer closer = Closer.create(); try { @SuppressWarnings("deprecation") - SequenceFile.Reader reader = closer.register(new SequenceFile.Reader(this.fs, tablePath, this.conf)); + GobblinSequenceFileReader reader = closer.register(new GobblinSequenceFileReader(this.fs, tablePath, this.conf)); try { Text key = new Text(); T state = this.stateClass.newInstance(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-oozie/src/test/resources/local/gobblin-oozie-example-workflow.xml ---------------------------------------------------------------------- diff --git a/gobblin-oozie/src/test/resources/local/gobblin-oozie-example-workflow.xml b/gobblin-oozie/src/test/resources/local/gobblin-oozie-example-workflow.xml index 637ea74..aac84ed 100644 --- a/gobblin-oozie/src/test/resources/local/gobblin-oozie-example-workflow.xml +++ b/gobblin-oozie/src/test/resources/local/gobblin-oozie-example-workflow.xml @@ -27,7 +27,7 @@ <value>true</value> </property> </configuration> - <main-class>gobblin.runtime.local.CliLocalJobLauncher</main-class> + <main-class>org.apache.gobblin.runtime.local.CliLocalJobLauncher</main-class> <arg>--jobconfig</arg> <arg>${nameNode}/path/to/jobconfig.properties</arg> <arg>--sysconfig</arg> http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java index fa35921..9da34d7 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java @@ -34,6 +34,7 @@ import org.apache.gobblin.metastore.predicates.StateStorePredicate; import org.apache.gobblin.metastore.predicates.StoreNamePredicate; import org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager; import org.apache.gobblin.util.filters.HiddenFilter; +import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -234,8 +235,24 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp Configuration deserializeConfig = new Configuration(this.conf); WritableShimSerialization.addToHadoopConfiguration(deserializeConfig); - try (@SuppressWarnings("deprecation") SequenceFile.Reader reader = new SequenceFile.Reader(this.fs, tablePath, + try (@SuppressWarnings("deprecation") GobblinSequenceFileReader reader = new GobblinSequenceFileReader(this.fs, tablePath, deserializeConfig)) { + + /** + * Add this change so that all stateful flow will have back compatibility. + * Shim layer of state store is therefore avoided because of this change. + * Keep the implementation of Shim layer temporarily. + */ + String className = reader.getValueClassName(); + if (className.startsWith("gobblin")) { + LOGGER.warn("There's old JobState with no apache package name being read while we cast them at runtime"); + className = "org.apache." + className; + } + + if (!className.equals(JobState.class.getName()) && !className.equals(JobState.DatasetState.class.getName())) { + throw new RuntimeException("There is a mismatch in the Class Type of state in state-store and that in runtime"); + } + // This is necessary for backward compatibility as existing jobs are using the JobState class Object writable = reader.getValueClass() == JobState.class ? new JobState() : new JobState.DatasetState(); @@ -255,7 +272,6 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp throw new IOException(e); } } - return states; } @@ -320,7 +336,7 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp ExecutorsUtils.newDaemonThreadFactory(Optional.of(LOGGER), Optional.of("GetFsDatasetStateStore-"))) .executeAndGetResults(); int maxNumberOfErrorLogs = 10; - IteratorExecutor.logFailures(results, LOGGER, maxNumberOfErrorLogs); + IteratorExecutor.logAndThrowFailures(results, LOGGER, maxNumberOfErrorLogs); } catch (InterruptedException e) { throw new IOException("Failed to get latest dataset states.", e); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java index 1ff6a07..edeca5b 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java @@ -44,6 +44,7 @@ import org.apache.gobblin.metastore.StateStore; import com.google.common.io.Files; + /** * Unit tests for {@link FsDatasetStateStore}. * @@ -181,6 +182,31 @@ public class FsDatasetStateStoreTest { Assert.assertEquals(datasetState.getDuration(), 1000); } + /** + * Loading previous statestore without apache package name. + * + * Specifically the example used here is the state store generated from previous gobblin-kafka version without + * changing the package name into apache-intialized. + * + * Should pass the test even the class name doesn't match given the change in + * @throws IOException + */ + @Test + public void testGetPreviousDatasetStatesByUrnsNoApache() throws IOException{ + String JOB_NAME_FOR_INCOMPATIBLE_STATE_STORE = "test_failing_job"; + + FsDatasetStateStore _fsDatasetStateStore = + new FsDatasetStateStore(ConfigurationKeys.LOCAL_FS_URI, + "gobblin-runtime/src/test/resources/datasetState"); + + try { + Map<String, JobState.DatasetState> datasetStatesByUrns = + _fsDatasetStateStore.getLatestDatasetStatesByUrns(JOB_NAME_FOR_INCOMPATIBLE_STATE_STORE); + } catch (RuntimeException re){ + Assert.fail("Loading of state store should not fail."); + } + } + @Test public void testGetMetadataForTables() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-utility/src/main/java/org/apache/gobblin/util/executors/IteratorExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/executors/IteratorExecutor.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/executors/IteratorExecutor.java index 13dc93e..2ca271e 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/executors/IteratorExecutor.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/executors/IteratorExecutor.java @@ -169,4 +169,35 @@ public class IteratorExecutor<T> { } } + /** + * Log failures in the output of {@link #executeAndGetResults()}, and also propagate exception to upper layer. + * @param results output of {@link #executeAndGetResults()} + * @param useLogger logger to log the messages into. + * @param atMost will log at most this many errors. + */ + public static <T> void logAndThrowFailures(List<Either<T, ExecutionException>> results, Logger useLogger, int atMost) { + Logger actualLogger = useLogger == null ? log : useLogger; + Iterator<Either<T, ExecutionException>> it = results.iterator(); + int printed = 0; + ExecutionException exc = null; + + while (it.hasNext()) { + Either<T, ExecutionException> nextResult = it.next(); + if (nextResult instanceof Either.Right) { + exc = ((Either.Right<T, ExecutionException>) nextResult).getRight(); + actualLogger.error("Iterator executor failure.", exc); + printed++; + if (printed >= atMost) { + break; + } + } + } + + /** + * Throw any exception that Executor ran into. + */ + if (printed > 0) { + throw new RuntimeException(exc); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/GobblinSequenceFileReader.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/GobblinSequenceFileReader.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/GobblinSequenceFileReader.java new file mode 100644 index 0000000..c619fa5 --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/GobblinSequenceFileReader.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.hadoop; + +import java.io.IOException; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; + + +/** + * Override the {@link SequenceFile.Reader} mainly to + * override the {@link SequenceFile.Reader {@link #getValueClassName()}} so that + * we can handle the package name issue properly. + */ +@Slf4j +public class GobblinSequenceFileReader extends SequenceFile.Reader { + public GobblinSequenceFileReader(FileSystem fs, Path file, + Configuration conf) throws IOException { + super(fs, file, conf); + } + + /** Returns the name of the value class. */ + public String getValueClassName() { + if (super.getValueClassName().startsWith("gobblin.")) { + log.info("[We have] " + super.getValueClassName()); + return "org.apache." + super.getValueClassName(); + } + + return super.getValueClassName(); + } +}
