This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a0cef2822d [GOBBLIN-2188] Define `Initializer.AfterInitializeMemento`
for GoT to tunnel state from `GenerateWorkUnits` to `CommitActivity` (#4091)
a0cef2822d is described below
commit a0cef2822d9558c7817bc9ff78f786b8bd58a8db
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Jan 7 02:46:55 2025 -0800
[GOBBLIN-2188] Define `Initializer.AfterInitializeMemento` for GoT to
tunnel state from `GenerateWorkUnits` to `CommitActivity` (#4091)
---
gobblin-api/build.gradle | 2 +
.../gobblin/configuration/ConfigurationKeys.java | 2 +
.../apache/gobblin/initializer/Initializer.java | 105 +++++++++++-
.../initializer/MultiConverterInitializer.java | 20 ++-
.../gobblin/initializer/MultiInitializer.java | 49 +++++-
.../writer/initializer/MultiWriterInitializer.java | 24 ++-
.../gobblin/initializer/MultiInitializerTest.java | 176 +++++++++++++++++++++
gobblin-metastore/build.gradle | 2 +-
.../writer/initializer/JdbcWriterInitializer.java | 61 +++++--
.../ddm/activity/impl/CommitActivityImpl.java | 31 +++-
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 52 ++++--
.../temporal/util/nesting/work/Workload.java | 3 +-
12 files changed, 479 insertions(+), 48 deletions(-)
diff --git a/gobblin-api/build.gradle b/gobblin-api/build.gradle
index b083bb7f8c..f725db8634 100644
--- a/gobblin-api/build.gradle
+++ b/gobblin-api/build.gradle
@@ -20,6 +20,8 @@ apply plugin: 'java'
dependencies {
compile externalDependency.guava
compile externalDependency.gson
+ compile externalDependency.jacksonCore
+ compile externalDependency.jacksonMapper
compile externalDependency.jasypt
compile externalDependency.jodaTime
compile externalDependency.commonsLang3
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 13a145a3e9..b6569af2d2 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -203,6 +203,7 @@ public class ConfigurationKeys {
public static final String TASK_DATA_ROOT_DIR_KEY = "task.data.root.dir";
public static final String SOURCE_CLASS_KEY = "source.class";
public static final String CONVERTER_CLASSES_KEY = "converter.classes";
+ public static final String CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY =
"converter.initializers.serialized.mementos";
public static final String RECORD_STREAM_PROCESSOR_CLASSES_KEY =
"recordStreamProcessor.classes";
public static final String FORK_OPERATOR_CLASS_KEY = "fork.operator.class";
public static final String DEFAULT_FORK_OPERATOR_CLASS =
"org.apache.gobblin.fork.IdentityForkOperator";
@@ -434,6 +435,7 @@ public class ConfigurationKeys {
public static final String WRITER_TRUNCATE_STAGING_TABLE = WRITER_PREFIX +
".truncate.staging.table";
public static final String WRITER_OUTPUT_DIR = WRITER_PREFIX + ".output.dir";
public static final String WRITER_BUILDER_CLASS = WRITER_PREFIX +
".builder.class";
+ public static final String WRITER_INITIALIZER_SERIALIZED_MEMENTO_KEY =
"writer.initializer.serialized.memento";
public static final String DEFAULT_WRITER_BUILDER_CLASS =
"org.apache.gobblin.writer.AvroDataWriterBuilder";
public static final String WRITER_FILE_NAME = WRITER_PREFIX + ".file.name";
public static final String WRITER_FILE_PATH = WRITER_PREFIX + ".file.path";
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java
b/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java
index 4454c8ae77..a31698d6f0 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java
@@ -18,14 +18,97 @@
package org.apache.gobblin.initializer;
import java.io.Closeable;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
public interface Initializer extends Closeable {
/**
- * Initialize for the writer.
+ * Marker interface to convey an opaque snapshot of the internal state of
any concrete {@link Initializer}, thus affording state serialization for
+ * eventual "revival" as a new `Initializer` holding equivalent internal
state. {@link #commemorate()} (i.e. create) the memento after
+ * {@link #initialize()} and subsequently {@link
#recall(AfterInitializeMemento)} the state it preserved before performing
{@link #close()}.
+ *
+ * When synchronous and the same instance throughout, the "Initializer
Lifecycle" is:
+ * [concrete `My_T implements Initializer`, instance A] -
+ * `.initialize()`; ==PROCESSING RUNS==; `.close()`;
+ *
+ * When trading `AfterInitializeMemento` between instances (even
memory-space boundaries) it becomes:
+ * [concrete `My_T implements Initializer`, instance A] -
+ * `.initialize()`; `.commemorate()`; ==PERSIST/TRANSMIT MEMENTO==
+ * ==PROCESSING RUNS==;
+ * [concrete `My_T implements Initializer`, instance B] -
+ * ==RECEIVE MEMENTO==; `.recall()`; `.close()`
*
- * @param state
- * @param workUnits WorkUnits created by Source
+ * Both for backwards compatibility and because not every concrete
`Initializer` has internal state worth capturing, not every `Initializer`
+ * impl will implement an `AfterInitializeMemento`. Those that do will
supply a unique impl capturing self-aware impl details of their
+ * `Initializer`.
+ *
+ * An `AfterInitializeMemento` impl needs simply be (de)serializable by
{@link ObjectMapper}.
+ *
+ * An `Initializer` impl with an `AfterInitializeMemento` impl MUST NOT
(re-)process any {@link org.apache.gobblin.source.workunit.WorkUnit}s
+ * during its {@link #close()} method: `WorkUnit` processing MUST occur
entirely within {@link #initialize()}.
+ */
+ @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include =
JsonTypeInfo.As.PROPERTY, property = "@class") // to handle variety of concrete
impls
+ public interface AfterInitializeMemento {
+ static Logger logger =
LoggerFactory.getLogger(AfterInitializeMemento.class);
+
+ /**
+ * Convey attempt to work with a concrete {@link AfterInitializeMemento}
of type other than the single expected companion type known to `forInitializer`.
+ * @see #castAsOrThrow(Class, Initializer)
+ */
+ static class MismatchedMementoException extends RuntimeException {
+ public MismatchedMementoException(AfterInitializeMemento memento,
Class<?> asClass, Initializer forInitializer) {
+ super(String.format("Memento '%s' for Initializer '%s' of class '%s' -
NOT '%s'", memento, forInitializer.getClass().getName(),
+ memento.getClass().getName(), asClass.getName()));
+ }
+ }
+
+ /** stringify as JSON */
+ static String serialize(AfterInitializeMemento memento) {
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ String result = objectMapper.writeValueAsString(memento);
+ logger.info("Serializing AfterInitializeMemento {} as '{}'", memento,
result);
+ return result;
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to serialize AfterInitializeMemento '" + memento
+ "'", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** destringify JSON */
+ static AfterInitializeMemento deserialize(String serialized) {
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ AfterInitializeMemento result = objectMapper.readValue(serialized,
AfterInitializeMemento.class);
+ logger.info("Deserializing AfterInitializeMemento '{}' as {}",
serialized, result);
+ return result;
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to deserialize AfterInitializeMemento '" +
serialized + "'", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** cast `this` (concrete `AfterInitializeMemento`) to `castClass`, else
{@link MismatchedMementoException} */
+ default <T extends AfterInitializeMemento> T castAsOrThrow(Class<T>
castClass, Initializer forInitializer)
+ throws MismatchedMementoException {
+ if (castClass.isAssignableFrom(this.getClass())) {
+ return (T) this;
+ } else {
+ throw new AfterInitializeMemento.MismatchedMementoException(this,
castClass, forInitializer);
+ }
+ }
+ }
+
+ /**
+ * Initialize the writer/converter (e.g. using the state and/or {@link
org.apache.gobblin.source.workunit.WorkUnit}s provided when constructing the
instance)
*/
public void initialize();
@@ -33,7 +116,23 @@ public interface Initializer extends Closeable {
* Removed checked exception.
* {@inheritDoc}
* @see java.io.Closeable#close()
+ *
+ * NOTE: An `Initializer` impl with an `AfterInitializeMemento` impl MUST
NOT (re-)process any {@link org.apache.gobblin.source.workunit.WorkUnit}s
+ * during its {@link #close()} method: `WorkUnit` processing MUST occur
entirely within {@link #initialize()}.
*/
@Override
public void close();
+
+ /** @return the `Initializer`-specific companion memento, to convey internal
state after {@link #initialize()}, and as needed to {@link #close()} */
+ default Optional<AfterInitializeMemento> commemorate() {
+ return Optional.empty();
+ }
+
+ /**
+ * "reinitialize" a fresh instance, per `Initializer`-specific companion
`memento`, with (equivalent) post {@link #initialize()} internal state needed
+ * to {@link #close()}
+ */
+ default void recall(AfterInitializeMemento memento) {
+ // noop
+ }
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java
b/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java
index 14ac3e1d6b..7fcc53e9e8 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java
@@ -18,27 +18,39 @@
package org.apache.gobblin.converter.initializer;
import java.util.List;
+import java.util.Optional;
import lombok.ToString;
+
import org.apache.gobblin.initializer.Initializer;
import org.apache.gobblin.initializer.MultiInitializer;
@ToString
public class MultiConverterInitializer implements ConverterInitializer {
- private final Initializer intializer;
+ private final Initializer initializer;
public MultiConverterInitializer(List<ConverterInitializer>
converterInitializers) {
- this.intializer = new MultiInitializer(converterInitializers);
+ this.initializer = new MultiInitializer(converterInitializers);
}
@Override
public void initialize() {
- this.intializer.initialize();
+ this.initializer.initialize();
}
@Override
public void close() {
- this.intializer.close();
+ this.initializer.close();
+ }
+
+ @Override
+ public Optional<AfterInitializeMemento> commemorate() {
+ return this.initializer.commemorate();
+ }
+
+ @Override
+ public void recall(AfterInitializeMemento memento) {
+ this.initializer.recall(memento);
}
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java
b/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java
index 44fa8b4931..69c60b5d7d 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java
@@ -19,18 +19,46 @@ package org.apache.gobblin.initializer;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.AccessLevel;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
import lombok.ToString;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Streams;
import com.google.common.io.Closer;
/**
- * Wraps multiple writer initializer behind its interface. This is useful when
there're more than one branch.
+ * Wraps multiple writer initializers, which is useful when more than one
branch.
*/
@ToString
public class MultiInitializer implements Initializer {
+
+ /** Commemorate each (`Optional`) {@link
org.apache.gobblin.initializer.Initializer.AfterInitializeMemento} of every
wrapped {@link Initializer} */
+ @Data
+ @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
+ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+ @RequiredArgsConstructor
+ private static class Memento implements AfterInitializeMemento {
+ // WARNING: not possible to use `List<Optional<AfterInitializeMemento>>`,
as first attempted, due to:
+ // com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
Unrecognized field "present" (class java.util.Optional), not marked as
+ // ignorable (0 known properties: ])
+ // at
[Source:(String)"{\"@class\":\"org.apache.gobblin.initializer.MultiInitializer$Memento\",\"orderedInitializersMementos\":[{\"present\":false}]}"]
+ // (through reference chain:
org.apache.gobblin.initializer.MultiInitializer$Memento[\"orderedInitializersMementos\"]->java.util.ArrayList[0]
+ // ->java.util.Optional[\"present\"])",
+ // the following does NOT fix, probably due to `Optional`'s nesting with
`List`:
+ // @JsonIgnoreProperties(ignoreUnknown = true)
+ @NonNull private List<AfterInitializeMemento> orderedInitializersMementos;
+ }
+
+
private final List<Initializer> initializers;
private final Closer closer;
@@ -57,4 +85,21 @@ public class MultiInitializer implements Initializer {
throw new RuntimeException(e);
}
}
-}
+
+ @Override
+ public Optional<AfterInitializeMemento> commemorate() {
+ return Optional.of(new MultiInitializer.Memento(this.initializers.stream()
+ .map(Initializer::commemorate)
+ .map(opt -> opt.orElse(null))
+ .collect(Collectors.toList())));
+ }
+
+ @Override
+ public void recall(AfterInitializeMemento memento) {
+ Memento recollection =
memento.castAsOrThrow(MultiInitializer.Memento.class, this);
+ Streams.zip(this.initializers.stream(),
recollection.orderedInitializersMementos.stream(), (initializer,
nullableInitializerMemento) -> {
+
Optional.ofNullable(nullableInitializerMemento).ifPresent(initializer::recall);
+ return null;
+ }).count(); // force evaluation, since `Streams.zip` used purely for side
effects
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java
index 7b3ba3e27b..38c5431353 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java
@@ -17,31 +17,41 @@
package org.apache.gobblin.writer.initializer;
-import org.apache.gobblin.initializer.Initializer;
-import org.apache.gobblin.initializer.MultiInitializer;
-
+import java.util.Optional;
import java.util.List;
import lombok.ToString;
+import org.apache.gobblin.initializer.Initializer;
+import org.apache.gobblin.initializer.MultiInitializer;
+
@ToString
public class MultiWriterInitializer implements WriterInitializer {
- private final Initializer intializer;
+ private final Initializer initializer;
public MultiWriterInitializer(List<WriterInitializer> writerInitializers) {
- this.intializer = new MultiInitializer(writerInitializers);
+ this.initializer = new MultiInitializer(writerInitializers);
}
@Override
public void initialize() {
- this.intializer.initialize();
+ this.initializer.initialize();
}
@Override
public void close() {
- this.intializer.close();
+ this.initializer.close();
}
+ @Override
+ public Optional<AfterInitializeMemento> commemorate() {
+ return this.initializer.commemorate();
+ }
+
+ @Override
+ public void recall(AfterInitializeMemento memento) {
+ this.initializer.recall(memento);
+ }
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/initializer/MultiInitializerTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/initializer/MultiInitializerTest.java
new file mode 100644
index 0000000000..79d6514a0b
--- /dev/null
+++
b/gobblin-core/src/test/java/org/apache/gobblin/initializer/MultiInitializerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.initializer;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Test {@link MultiInitializer} */
+public class MultiInitializerTest {
+ /** Concrete Initializer A - implements `AfterInitializeMemento` */
+ private static class InitializerImplA implements Initializer {
+ private static int instanceCounter = 0;
+
+ @Data
+ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+ @RequiredArgsConstructor
+ private static class Memento implements Initializer.AfterInitializeMemento
{
+ @NonNull private String state;
+ }
+
+ @Getter private String state;
+
+ @Override
+ public void initialize() {
+ this.state = "initialized-" + (++instanceCounter);
+ }
+
+ @Override
+ public void close() {
+ // noop
+ }
+
+ @Override
+ public Optional<AfterInitializeMemento> commemorate() {
+ return Optional.of(new Memento(this.state));
+ }
+
+ @Override
+ public void recall(AfterInitializeMemento memento) {
+ Memento recollection = memento.castAsOrThrow(Memento.class, this);
+ this.state = recollection.getState();
+ }
+ }
+
+
+ /** Concrete Initializer B - DOES NOT implement `AfterInitializeMemento`! */
+ private static class InitializerImplBNoMemento implements Initializer {
+ private static int instanceCounter = 0;
+
+ @Getter private String state;
+
+ @Override
+ public void initialize() {
+ this.state = "ignore-" + (++instanceCounter);
+ }
+
+ @Override
+ public void close() {
+ // noop
+ }
+ }
+
+
+ /** Concrete Initializer C - implements `AfterInitializeMemento` */
+ private static class InitializerImplC implements Initializer {
+ private static int instanceCounter = 0;
+
+ @Data
+ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+ @RequiredArgsConstructor
+ private static class MyMemento implements AfterInitializeMemento {
+ @NonNull private int state;
+ }
+
+ @Getter private int state;
+
+ @Override
+ public void initialize() {
+ this.state = 41 + (++instanceCounter);
+ }
+
+ @Override
+ public void close() {
+ // noop
+ }
+
+ @Override
+ public Optional<AfterInitializeMemento> commemorate() {
+ return Optional.of(new MyMemento(this.state));
+ }
+
+ @Override
+ public void recall(AfterInitializeMemento memento) {
+ MyMemento recollection = memento.castAsOrThrow(MyMemento.class, this);
+ this.state = recollection.getState();
+ }
+ }
+
+
+ @Test
+ public void testMementoCommemorateToSerializeAndDeserializeForRecall() {
+ // create "first generation" of concrete initializers
+ InitializerImplA initializerA_1 = new InitializerImplA();
+ InitializerImplBNoMemento initializerB_1 = new InitializerImplBNoMemento();
+ InitializerImplC initializerC_1 = new InitializerImplC();
+
+ // create the 1st-gen `MultiInitializer`; `initialize` all wrapped
initializers
+ MultiInitializer multiInitializer1G = new
MultiInitializer(Arrays.asList(initializerA_1, initializerB_1, initializerC_1));
+ multiInitializer1G.initialize();
+
+ // `commemorate` and `serialize` 1st-gen state
+ Optional<Initializer.AfterInitializeMemento> optMemento1G =
multiInitializer1G.commemorate();
+ Assert.assertTrue(optMemento1G.isPresent());
+ String serializedMemento =
Initializer.AfterInitializeMemento.serialize(optMemento1G.get());
+
+ // create a new 2nd-gen `MultiInitializer` with a "second generation" of
concrete initializers... but DO NOT `initialize` them!
+ InitializerImplA initializerA_2 = new InitializerImplA();
+ InitializerImplBNoMemento initializerB_2 = new InitializerImplBNoMemento();
+ InitializerImplC initializerC_2 = new InitializerImplC();
+ MultiInitializer multiInitializer2G = new
MultiInitializer(Arrays.asList(initializerA_2, initializerB_2, initializerC_2));
+
+ // verify that state differs between 1st-gen and 2nd-gen `Initializer`s
+ Assert.assertNotEquals(initializerA_1.getState(),
initializerA_2.getState());
+ Assert.assertNotEquals(initializerB_1.getState(),
initializerB_2.getState());
+ Assert.assertNotEquals(initializerC_1.getState(),
initializerC_2.getState());
+
+ try {
+ // verify not possible to `commemorate` prior to `recall()`
+ multiInitializer2G.commemorate();
+ Assert.fail("`commemorate()` somehow possible even before
`Initializer.initialize()` or `recall()` - despite `@NotNull` annotation on
`state`");
+ } catch (NullPointerException npe) {
+ // expected
+ }
+
+ // next, `deserialize` 1st-gen state and `recall` it to the
(un-`initialize`d) 2nd-gen `MultiInitializer`
+ Initializer.AfterInitializeMemento deserializedMemento =
Initializer.AfterInitializeMemento.deserialize(serializedMemento);
+ multiInitializer2G.recall(deserializedMemento);
+ Optional<Initializer.AfterInitializeMemento> optMemento2G =
multiInitializer2G.commemorate();
+ Assert.assertTrue(optMemento2G.isPresent());
+
+ // verify that post-`recall`ed memento equivalent to post-`initialize`d one
+ Assert.assertEquals(optMemento1G.get(), optMemento2G.get());
+
+ // WARNING: in real code, DO NOT `initialize` following `recall`, as it
would reset the state of the wrapped initializers, negating the `recall`
+ multiInitializer2G.initialize();
+ Optional<Initializer.AfterInitializeMemento> optMemento2G_alt =
multiInitializer2G.commemorate();
+ Assert.assertTrue(optMemento2G_alt.isPresent());
+ // verify not simply the case that mementos always equal
+ Assert.assertNotEquals(optMemento2G.get(), optMemento2G_alt.get());
+ }
+}
diff --git a/gobblin-metastore/build.gradle b/gobblin-metastore/build.gradle
index d5b9bc1a6d..063a0c0a53 100644
--- a/gobblin-metastore/build.gradle
+++ b/gobblin-metastore/build.gradle
@@ -45,7 +45,7 @@ dependencies {
testCompile externalDependency.slf4jToLog4j
}
-// Begin HACK to get around POM being depenendent on the (empty)
gobblin-rest-api instead of gobblin-rest-api-rest-client
+// Begin HACK to get around POM being dependent on the (empty)
gobblin-rest-api instead of gobblin-rest-api-rest-client
def installer = install.repositories.mavenInstaller
[installer]*.pom*.whenConfigured {pom ->
pom.dependencies.find {dep -> dep.groupId == project.group &&
dep.artifactId == 'gobblin-rest-api' }.artifactId =
'gobblin-rest-api-rest-client'
diff --git
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java
index 8226de7698..0f78ff31b6 100644
---
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java
+++
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java
@@ -17,28 +17,23 @@
package org.apache.gobblin.writer.initializer;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.publisher.JdbcPublisher;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.util.ForkOperatorUtils;
-import org.apache.gobblin.util.jdbc.DataSourceBuilder;
-import org.apache.gobblin.writer.Destination;
-import org.apache.gobblin.writer.Destination.DestinationType;
-import org.apache.gobblin.writer.commands.JdbcWriterCommands;
-import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
-import org.apache.gobblin.source.extractor.JobCommitPolicy;
-
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
@@ -50,12 +45,37 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.publisher.JdbcPublisher;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.gobblin.util.jdbc.DataSourceBuilder;
+import org.apache.gobblin.writer.Destination;
+import org.apache.gobblin.writer.Destination.DestinationType;
+import org.apache.gobblin.writer.commands.JdbcWriterCommands;
+import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+
/**
* Initialize for JDBC writer and also performs clean up.
*/
@ToString
public class JdbcWriterInitializer implements WriterInitializer {
+ /** Commemorate all and exactly those fields, for which to preserve instance
state - the name(s) of temporary staging DB tables */
+ @Data
+ @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
+ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+ @AllArgsConstructor
+ private static class Memento implements AfterInitializeMemento {
+ // NOTE: as this clearly MAY be `null` (see below), DO NOT mark
`@NonNull`, to avoid:
+ // userCreatedStagingTable is marked non-null but is null
+ private String userCreatedStagingTable;
+ @NonNull private Set<String> createdStagingTables;
+ }
+
+
private static final Logger LOG =
LoggerFactory.getLogger(JdbcWriterInitializer.class);
private static final String STAGING_TABLE_FORMAT = "stage_%d";
private static final int NAMING_STAGING_TABLE_TRIAL = 10;
@@ -95,7 +115,7 @@ public class JdbcWriterInitializer implements
WriterInitializer {
* Drop table if it's created by this instance.
* Truncate staging tables passed by user.
* {@inheritDoc}
- * @see org.apache.gobblin.Initializer#close()
+ * @see org.apache.gobblin.initializer.Initializer#close()
*/
@Override
public void close() {
@@ -200,7 +220,6 @@ public class JdbcWriterInitializer implements
WriterInitializer {
* 3.1. Create staging table with unique name.
* 3.2. Try to drop and recreate the table to confirm if we can drop it
later.
* 4. Update Workunit state with staging table information.
- * @param state
*/
@Override
public void initialize() {
@@ -225,7 +244,7 @@ public class JdbcWriterInitializer implements
WriterInitializer {
i++;
if (isSkipStaging) {
- LOG.info("User chose to skip staing table on branch " +
this.branchId + " workunit " + i);
+ LOG.info("User chose to skip staging table on branch " +
this.branchId + " workunit " + i);
wu.setProp(stagingTableKey, publishTable);
if (i == 0) {
@@ -277,6 +296,18 @@ public class JdbcWriterInitializer implements
WriterInitializer {
}
}
+ @Override
+ public Optional<AfterInitializeMemento> commemorate() {
+ return Optional.of(new
JdbcWriterInitializer.Memento(this.userCreatedStagingTable,
Sets.newHashSet(this.createdStagingTables)));
+ }
+
+ @Override
+ public void recall(AfterInitializeMemento memento) {
+ Memento recollection =
memento.castAsOrThrow(JdbcWriterInitializer.Memento.class, this);
+ this.userCreatedStagingTable = recollection.userCreatedStagingTable;
+ this.createdStagingTables =
Sets.newHashSet(recollection.createdStagingTables);
+ }
+
private JdbcWriterCommands createJdbcWriterCommands(Connection conn) {
String destKey =
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_DESTINATION_TYPE_KEY,
this.branches, this.branchId);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index 6346e08df3..b43c079c10 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -40,7 +40,7 @@ import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-
+import com.google.common.io.Closer;
import io.temporal.failure.ApplicationFailure;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
@@ -48,6 +48,8 @@ import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
+import org.apache.gobblin.initializer.Initializer;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.JobContext;
@@ -58,6 +60,8 @@ import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
@@ -69,6 +73,7 @@ import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@Slf4j
@@ -76,7 +81,7 @@ public class CommitActivityImpl implements CommitActivity {
static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
static int DEFAULT_NUM_COMMIT_THREADS = 1;
- static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+ static String UNDEFINED_JOB_NAME = "<<UNKNOWN JOB NAME>>";
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
@@ -105,6 +110,22 @@ public class CommitActivityImpl implements CommitActivity {
} catch (FailedDatasetUrnsException exception) {
log.warn("Some datasets failed to be committed, proceeding with
publishing commit step", exception);
optFailure = Optional.of(exception);
+ } finally {
+ // if Work Discovery transmitted any writer/converter
`Initializer.AfterInitializeMemento`s within `jobState`, deserialize them now to
+ // `.recall()` and "re-initialize" equivalent writer and/or
converter(s) `Initializer`s, to complete their `.close()`
+ // NOTE: the "revived" `Initializer`s are constructed with empty
placeholder WUs
+ Closer closer = Closer.create(); // (purely to suppress exceptions)
+
Optional.ofNullable(jobState.getProp(ConfigurationKeys.WRITER_INITIALIZER_SERIALIZED_MEMENTO_KEY)).map(mementoProp
->
+ Initializer.AfterInitializeMemento.deserialize(mementoProp)
+ ).ifPresent(memento ->
+ closer.register(WriterInitializerFactory.newInstace(jobState,
createEmptyWorkUnitStream())).recall(memento)
+ );
+
Optional.ofNullable(jobState.getProp(ConfigurationKeys.CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY)).map(mementoProp
->
+ Initializer.AfterInitializeMemento.deserialize(mementoProp)
+ ).ifPresent(memento ->
+ closer.register(ConverterInitializerFactory.newInstance(jobState,
createEmptyWorkUnitStream())).recall(memento)
+ );
+ closer.close();
}
boolean shouldIncludeFailedTasks =
PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
@@ -247,4 +268,8 @@ public class CommitActivityImpl implements CommitActivity {
private static String calcCommitId(WUProcessingSpec workSpec) {
return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
}
-}
\ No newline at end of file
+
+ private static WorkUnitStream createEmptyWorkUnitStream() {
+ return new BasicWorkUnitStream.Builder(Lists.newArrayList()).build();
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 63d9b6b1ea..33f60bd779 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -40,7 +41,9 @@ import io.temporal.failure.ApplicationFailure;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.initializer.ConverterInitializer;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
+import org.apache.gobblin.initializer.Initializer;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
@@ -63,6 +66,7 @@ import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.writer.initializer.WriterInitializer;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@@ -106,6 +110,16 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
}
+ /** [Internal, impl class] Intermediate result of generated work units with
insightful analysis extracted by pre-processing them */
+ @Data
+ private static class WorkUnitsWithInsights {
+ private final List<WorkUnit> workUnits;
+ private final Set<String> pathsToCleanUp;
+ private final Optional<Initializer.AfterInitializeMemento>
optWriterMemento;
+ private final Optional<Initializer.AfterInitializeMemento>
optConverterMemento;
+ }
+
+
@Override
public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps,
EventSubmitterContext eventSubmitterContext) {
// TODO: decide whether to acquire a job lock (as MR did)!
@@ -128,8 +142,8 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
FileSystem fs = closer.register(JobStateUtils.openFileSystem(jobState));
fs.mkdirs(workDirRoot);
- Set<String> pathsToCleanUp = new HashSet<>();
- List<WorkUnit> workUnits =
generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState,
eventSubmitterContext, closer, pathsToCleanUp);
+ WorkUnitsWithInsights genWUsInsights =
generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState,
eventSubmitterContext, closer);
+ List<WorkUnit> workUnits = genWUsInsights.getWorkUnits();
int numSizeSummaryQuantiles =
getConfiguredNumSizeSummaryQuantiles(jobState);
WorkUnitsSizeSummary wuSizeSummary =
digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles);
@@ -138,11 +152,20 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
// exceed available memory and this activity execution were to fail, a
subsequent re-attempt would know the amount of work, to guide re-config/attempt
createWorkPreparedSizeDistillationTimer(wuSizeSummary,
eventSubmitterContext).stop();
+ // add any (serialized) mementos before serializing `jobState`, for
later `recall` during `CommitActivityImpl`
+ genWUsInsights.optWriterMemento.ifPresent(memento ->
+
jobState.setProp(ConfigurationKeys.WRITER_INITIALIZER_SERIALIZED_MEMENTO_KEY,
+ Initializer.AfterInitializeMemento.serialize(memento))
+ );
+ genWUsInsights.optConverterMemento.ifPresent(memento ->
+
jobState.setProp(ConfigurationKeys.CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY,
+ Initializer.AfterInitializeMemento.serialize(memento))
+ );
JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION:
the writing of `JobState` after all WUs signifies WU gen+serialization now
complete
String sourceClassName = JobStateUtils.getSourceClassName(jobState);
- return new GenerateWorkUnitsResult(jobState.getTaskCount(),
sourceClassName, wuSizeSummary, pathsToCleanUp);
+ return new GenerateWorkUnitsResult(jobState.getTaskCount(),
sourceClassName, wuSizeSummary, genWUsInsights.getPathsToCleanUp());
} catch (ReflectiveOperationException roe) {
String errMsg = "Unable to construct a source for generating workunits
for job " + jobState.getJobId();
log.error(errMsg, roe);
@@ -157,8 +180,7 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
}
}
- protected List<WorkUnit>
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState,
EventSubmitterContext eventSubmitterContext, Closer closer,
- Set<String> pathsToCleanUp)
+ protected WorkUnitsWithInsights
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState,
EventSubmitterContext eventSubmitterContext, Closer closer)
throws ReflectiveOperationException {
// report (timer) metrics for "Work Discovery", *planning only* - NOT
including WU prep, like serialization, `DestinationDatasetHandlerService`ing,
etc.
// IMPORTANT: for accurate timing, SEPARATELY emit
`.createWorkPreparationTimer`, to record time prior to measuring the WU size
required for that one
@@ -179,18 +201,20 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run:
entirely normal result (not a failure)
log.warn("No work units created for job " + jobState.getJobId());
- return Lists.newArrayList();
+ return new WorkUnitsWithInsights(Lists.newArrayList(), new HashSet<>(),
Optional.empty(), Optional.empty());
}
boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher`
running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for
cleanup
DestinationDatasetHandlerService datasetHandlerService = closer.register(
new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs,
eventSubmitterContext.create()));
WorkUnitStream handledWorkUnitStream =
datasetHandlerService.executeHandlers(workUnitStream);
- pathsToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
- // initialize writer and converter(s)
- // TODO: determine whether registration here is effective, or the
lifecycle of this activity is too brief (as is likely!)
- closer.register(WriterInitializerFactory.newInstace(jobState,
handledWorkUnitStream)).initialize();
- closer.register(ConverterInitializerFactory.newInstance(jobState,
handledWorkUnitStream)).initialize();
+ Set<String> pathsToCleanUp = new
HashSet<>(calculateWorkDirsToCleanup(handledWorkUnitStream));
+
+ // initialize writer and converter(s), but DO NOT `.close()` them here;
rather `.commemorate()` for later `.recall()` during Commit
+ WriterInitializer writerInitializer =
WriterInitializerFactory.newInstace(jobState, handledWorkUnitStream);
+ writerInitializer.initialize();
+ ConverterInitializer converterInitializer =
ConverterInitializerFactory.newInstance(jobState, handledWorkUnitStream);
+ converterInitializer.initialize();
// update jobState before it gets serialized
long startTime = System.currentTimeMillis();
@@ -206,7 +230,11 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
// dump the work unit if tracking logs are enabled (post any
materialization for counting)
WorkUnitStream trackedWorkUnitStream =
AbstractJobLauncher.addWorkUnitTrackingPerConfig(preparedWorkUnitStream,
jobState, log);
- return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream);
+ return new WorkUnitsWithInsights(
+ AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream),
+ pathsToCleanUp,
+ writerInitializer.commemorate(),
+ converterInitializer.commemorate());
}
protected static Set<String> calculateWorkDirsToCleanup(WorkUnitStream
workUnitStream) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
index 239825f7cf..0bb9192cc6 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
@@ -17,10 +17,11 @@
package org.apache.gobblin.temporal.util.nesting.work;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.Iterator;
import java.util.Optional;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
/**
* {@link Workload} models a logical collection of homogenous inputs over
which a "foreach" operation can asynchronously apply