This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a0cef2822d [GOBBLIN-2188] Define `Initializer.AfterInitializeMemento` 
for GoT to tunnel state from `GenerateWorkUnits` to `CommitActivity` (#4091)
a0cef2822d is described below

commit a0cef2822d9558c7817bc9ff78f786b8bd58a8db
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Jan 7 02:46:55 2025 -0800

    [GOBBLIN-2188] Define `Initializer.AfterInitializeMemento` for GoT to 
tunnel state from `GenerateWorkUnits` to `CommitActivity` (#4091)
---
 gobblin-api/build.gradle                           |   2 +
 .../gobblin/configuration/ConfigurationKeys.java   |   2 +
 .../apache/gobblin/initializer/Initializer.java    | 105 +++++++++++-
 .../initializer/MultiConverterInitializer.java     |  20 ++-
 .../gobblin/initializer/MultiInitializer.java      |  49 +++++-
 .../writer/initializer/MultiWriterInitializer.java |  24 ++-
 .../gobblin/initializer/MultiInitializerTest.java  | 176 +++++++++++++++++++++
 gobblin-metastore/build.gradle                     |   2 +-
 .../writer/initializer/JdbcWriterInitializer.java  |  61 +++++--
 .../ddm/activity/impl/CommitActivityImpl.java      |  31 +++-
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   |  52 ++++--
 .../temporal/util/nesting/work/Workload.java       |   3 +-
 12 files changed, 479 insertions(+), 48 deletions(-)

diff --git a/gobblin-api/build.gradle b/gobblin-api/build.gradle
index b083bb7f8c..f725db8634 100644
--- a/gobblin-api/build.gradle
+++ b/gobblin-api/build.gradle
@@ -20,6 +20,8 @@ apply plugin: 'java'
 dependencies {
     compile externalDependency.guava
     compile externalDependency.gson
+    compile externalDependency.jacksonCore
+    compile externalDependency.jacksonMapper
     compile externalDependency.jasypt
     compile externalDependency.jodaTime
     compile externalDependency.commonsLang3
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 13a145a3e9..b6569af2d2 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -203,6 +203,7 @@ public class ConfigurationKeys {
   public static final String TASK_DATA_ROOT_DIR_KEY = "task.data.root.dir";
   public static final String SOURCE_CLASS_KEY = "source.class";
   public static final String CONVERTER_CLASSES_KEY = "converter.classes";
+  public static final String CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY = 
"converter.initializers.serialized.mementos";
   public static final String RECORD_STREAM_PROCESSOR_CLASSES_KEY = 
"recordStreamProcessor.classes";
   public static final String FORK_OPERATOR_CLASS_KEY = "fork.operator.class";
   public static final String DEFAULT_FORK_OPERATOR_CLASS = 
"org.apache.gobblin.fork.IdentityForkOperator";
@@ -434,6 +435,7 @@ public class ConfigurationKeys {
   public static final String WRITER_TRUNCATE_STAGING_TABLE = WRITER_PREFIX + 
".truncate.staging.table";
   public static final String WRITER_OUTPUT_DIR = WRITER_PREFIX + ".output.dir";
   public static final String WRITER_BUILDER_CLASS = WRITER_PREFIX + 
".builder.class";
+  public static final String WRITER_INITIALIZER_SERIALIZED_MEMENTO_KEY = 
"writer.initializer.serialized.memento";
   public static final String DEFAULT_WRITER_BUILDER_CLASS = 
"org.apache.gobblin.writer.AvroDataWriterBuilder";
   public static final String WRITER_FILE_NAME = WRITER_PREFIX + ".file.name";
   public static final String WRITER_FILE_PATH = WRITER_PREFIX + ".file.path";
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java 
b/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java
index 4454c8ae77..a31698d6f0 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java
@@ -18,14 +18,97 @@
 package org.apache.gobblin.initializer;
 
 import java.io.Closeable;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 
 public interface Initializer extends Closeable {
 
   /**
-   * Initialize for the writer.
+   * Marker interface to convey an opaque snapshot of the internal state of 
any concrete {@link Initializer}, thus affording state serialization for
+   * eventual "revival" as a new `Initializer` holding equivalent internal 
state.  {@link #commemorate()} (i.e. create) the memento after
+   * {@link #initialize()} and subsequently {@link 
#recall(AfterInitializeMemento)} the state it preserved before performing 
{@link #close()}.
+   *
+   * When synchronous and the same instance throughout, the "Initializer 
Lifecycle" is:
+   *   [concrete `My_T implements Initializer`, instance A] -
+   *       `.initialize()`; ==PROCESSING RUNS==; `.close()`;
+   *
+   * When trading `AfterInitializeMemento` between instances (even 
memory-space boundaries) it becomes:
+   *   [concrete `My_T implements Initializer`, instance A] -
+   *       `.initialize()`; `.commemorate()`; ==PERSIST/TRANSMIT MEMENTO==
+   *   ==PROCESSING RUNS==;
+   *   [concrete `My_T implements Initializer`, instance B] -
+   *       ==RECEIVE MEMENTO==; `.recall()`; `.close()`
    *
-   * @param state
-   * @param workUnits WorkUnits created by Source
+   * Both for backwards compatibility and because not every concrete 
`Initializer` has internal state worth capturing, not every `Initializer`
+   * impl will implement an `AfterInitializeMemento`.  Those that do will 
supply a unique impl capturing self-aware impl details of their
+   * `Initializer`.
+   *
+   * An `AfterInitializeMemento` impl needs simply be (de)serializable by 
{@link ObjectMapper}.
+   *
+   * An `Initializer` impl with an `AfterInitializeMemento` impl MUST NOT 
(re-)process any {@link org.apache.gobblin.source.workunit.WorkUnit}s
+   * during its {@link #close()} method: `WorkUnit` processing MUST occur 
entirely within {@link #initialize()}.
+   */
+  @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = 
JsonTypeInfo.As.PROPERTY, property = "@class") // to handle variety of concrete 
impls
+  public interface AfterInitializeMemento {
+    static Logger logger = 
LoggerFactory.getLogger(AfterInitializeMemento.class);
+
+    /**
+     * Convey attempt to work with a concrete {@link AfterInitializeMemento} 
of type other than the single expected companion type known to `forInitializer`.
+     * @see #castAsOrThrow(Class, Initializer)
+     */
+    static class MismatchedMementoException extends RuntimeException {
+      public MismatchedMementoException(AfterInitializeMemento memento, 
Class<?> asClass, Initializer forInitializer) {
+        super(String.format("Memento '%s' for Initializer '%s' of class '%s' - 
NOT '%s'", memento, forInitializer.getClass().getName(),
+            memento.getClass().getName(), asClass.getName()));
+      }
+    }
+
+    /** stringify as JSON */
+    static String serialize(AfterInitializeMemento memento) {
+      ObjectMapper objectMapper = new ObjectMapper();
+      try {
+        String result = objectMapper.writeValueAsString(memento);
+        logger.info("Serializing AfterInitializeMemento {} as '{}'", memento, 
result);
+        return result;
+      } catch (JsonProcessingException e) {
+        logger.error("Failed to serialize AfterInitializeMemento '" + memento 
+ "'", e);
+        throw new RuntimeException(e);
+      }
+    }
+
+    /** destringify JSON */
+    static AfterInitializeMemento deserialize(String serialized) {
+      ObjectMapper objectMapper = new ObjectMapper();
+      try {
+        AfterInitializeMemento result = objectMapper.readValue(serialized, 
AfterInitializeMemento.class);
+        logger.info("Deserializing AfterInitializeMemento '{}' as {}", 
serialized, result);
+        return result;
+      } catch (JsonProcessingException e) {
+        logger.error("Failed to deserialize AfterInitializeMemento '" + 
serialized + "'", e);
+        throw new RuntimeException(e);
+      }
+    }
+
+    /** cast `this` (concrete `AfterInitializeMemento`) to `castClass`, else 
{@link MismatchedMementoException} */
+    default <T extends AfterInitializeMemento> T castAsOrThrow(Class<T> 
castClass, Initializer forInitializer)
+        throws MismatchedMementoException {
+      if (castClass.isAssignableFrom(this.getClass())) {
+        return (T) this;
+      } else {
+        throw new AfterInitializeMemento.MismatchedMementoException(this, 
castClass, forInitializer);
+      }
+    }
+  }
+
+  /**
+   * Initialize the writer/converter (e.g. using the state and/or {@link 
org.apache.gobblin.source.workunit.WorkUnit}s provided when constructing the 
instance)
    */
   public void initialize();
 
@@ -33,7 +116,23 @@ public interface Initializer extends Closeable {
    * Removed checked exception.
    * {@inheritDoc}
    * @see java.io.Closeable#close()
+   *
+   * NOTE: An `Initializer` impl with an `AfterInitializeMemento` impl MUST 
NOT (re-)process any {@link org.apache.gobblin.source.workunit.WorkUnit}s
+   * during its {@link #close()} method: `WorkUnit` processing MUST occur 
entirely within {@link #initialize()}.
    */
   @Override
   public void close();
+
+  /** @return the `Initializer`-specific companion memento, to convey internal 
state after {@link #initialize()}, and as needed to {@link #close()} */
+  default Optional<AfterInitializeMemento> commemorate() {
+    return Optional.empty();
+  }
+
+  /**
+   * "reinitialize" a fresh instance, per `Initializer`-specific companion 
`memento`, with (equivalent) post {@link #initialize()} internal state needed
+   * to {@link #close()}
+   */
+  default void recall(AfterInitializeMemento memento) {
+    // noop
+  }
 }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java
index 14ac3e1d6b..7fcc53e9e8 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java
@@ -18,27 +18,39 @@
 package org.apache.gobblin.converter.initializer;
 
 import java.util.List;
+import java.util.Optional;
 
 import lombok.ToString;
+
 import org.apache.gobblin.initializer.Initializer;
 import org.apache.gobblin.initializer.MultiInitializer;
 
 
 @ToString
 public class MultiConverterInitializer implements ConverterInitializer {
-  private final Initializer intializer;
+  private final Initializer initializer;
 
   public MultiConverterInitializer(List<ConverterInitializer> 
converterInitializers) {
-    this.intializer = new MultiInitializer(converterInitializers);
+    this.initializer = new MultiInitializer(converterInitializers);
   }
 
   @Override
   public void initialize() {
-    this.intializer.initialize();
+    this.initializer.initialize();
   }
 
   @Override
   public void close() {
-    this.intializer.close();
+    this.initializer.close();
+  }
+
+  @Override
+  public Optional<AfterInitializeMemento> commemorate() {
+    return this.initializer.commemorate();
+  }
+
+  @Override
+  public void recall(AfterInitializeMemento memento) {
+    this.initializer.recall(memento);
   }
 }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java
index 44fa8b4931..69c60b5d7d 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java
@@ -19,18 +19,46 @@ package org.apache.gobblin.initializer;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
+import lombok.AccessLevel;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 import lombok.ToString;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Streams;
 import com.google.common.io.Closer;
 
 
 /**
- * Wraps multiple writer initializer behind its interface. This is useful when 
there're more than one branch.
+ * Wraps multiple writer initializers, which is useful when more than one 
branch.
  */
 @ToString
 public class MultiInitializer implements Initializer {
+
+  /** Commemorate each (`Optional`) {@link 
org.apache.gobblin.initializer.Initializer.AfterInitializeMemento} of every 
wrapped {@link Initializer} */
+  @Data
+  @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
+  @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+  @RequiredArgsConstructor
+  private static class Memento implements AfterInitializeMemento {
+    // WARNING: not possible to use `List<Optional<AfterInitializeMemento>>`, 
as first attempted, due to:
+    //   com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: 
Unrecognized field "present" (class java.util.Optional), not marked as
+    //   ignorable (0 known properties: ])
+    //     at 
[Source:(String)"{\"@class\":\"org.apache.gobblin.initializer.MultiInitializer$Memento\",\"orderedInitializersMementos\":[{\"present\":false}]}"]
+    //     (through reference chain: 
org.apache.gobblin.initializer.MultiInitializer$Memento[\"orderedInitializersMementos\"]->java.util.ArrayList[0]
+    //       ->java.util.Optional[\"present\"])",
+    // the following does NOT fix, probably due to `Optional`'s nesting with 
`List`:
+    //   @JsonIgnoreProperties(ignoreUnknown = true)
+    @NonNull private List<AfterInitializeMemento> orderedInitializersMementos;
+  }
+
+
   private final List<Initializer> initializers;
   private final Closer closer;
 
@@ -57,4 +85,21 @@ public class MultiInitializer implements Initializer {
       throw new RuntimeException(e);
     }
   }
-}
+
+  @Override
+  public Optional<AfterInitializeMemento> commemorate() {
+    return Optional.of(new MultiInitializer.Memento(this.initializers.stream()
+        .map(Initializer::commemorate)
+        .map(opt -> opt.orElse(null))
+        .collect(Collectors.toList())));
+  }
+
+  @Override
+  public void recall(AfterInitializeMemento memento) {
+    Memento recollection = 
memento.castAsOrThrow(MultiInitializer.Memento.class, this);
+    Streams.zip(this.initializers.stream(), 
recollection.orderedInitializersMementos.stream(), (initializer, 
nullableInitializerMemento) -> {
+      
Optional.ofNullable(nullableInitializerMemento).ifPresent(initializer::recall);
+      return null;
+    }).count(); // force evaluation, since `Streams.zip` used purely for side 
effects
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java
index 7b3ba3e27b..38c5431353 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java
@@ -17,31 +17,41 @@
 
 package org.apache.gobblin.writer.initializer;
 
-import org.apache.gobblin.initializer.Initializer;
-import org.apache.gobblin.initializer.MultiInitializer;
-
+import java.util.Optional;
 import java.util.List;
 
 import lombok.ToString;
 
+import org.apache.gobblin.initializer.Initializer;
+import org.apache.gobblin.initializer.MultiInitializer;
+
 
 @ToString
 public class MultiWriterInitializer implements WriterInitializer {
 
-  private final Initializer intializer;
+  private final Initializer initializer;
 
   public MultiWriterInitializer(List<WriterInitializer> writerInitializers) {
-    this.intializer = new MultiInitializer(writerInitializers);
+    this.initializer = new MultiInitializer(writerInitializers);
   }
 
   @Override
   public void initialize() {
-    this.intializer.initialize();
+    this.initializer.initialize();
   }
 
   @Override
   public void close() {
-    this.intializer.close();
+    this.initializer.close();
   }
 
+  @Override
+  public Optional<AfterInitializeMemento> commemorate() {
+    return this.initializer.commemorate();
+  }
+
+  @Override
+  public void recall(AfterInitializeMemento memento) {
+    this.initializer.recall(memento);
+  }
 }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/initializer/MultiInitializerTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/initializer/MultiInitializerTest.java
new file mode 100644
index 0000000000..79d6514a0b
--- /dev/null
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/initializer/MultiInitializerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.initializer;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Test {@link MultiInitializer} */
+public class MultiInitializerTest {
+  /** Concrete Initializer A - implements `AfterInitializeMemento` */
+  private static class InitializerImplA implements Initializer {
+    private static int instanceCounter = 0;
+
+    @Data
+    @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+    @RequiredArgsConstructor
+    private static class Memento implements Initializer.AfterInitializeMemento 
{
+      @NonNull private String state;
+    }
+
+    @Getter private String state;
+
+    @Override
+    public void initialize() {
+      this.state = "initialized-" + (++instanceCounter);
+    }
+
+    @Override
+    public void close() {
+      // noop
+    }
+
+    @Override
+    public Optional<AfterInitializeMemento> commemorate() {
+      return Optional.of(new Memento(this.state));
+    }
+
+    @Override
+    public void recall(AfterInitializeMemento memento) {
+      Memento recollection = memento.castAsOrThrow(Memento.class, this);
+      this.state = recollection.getState();
+    }
+  }
+
+
+  /** Concrete Initializer B - DOES NOT implement `AfterInitializeMemento`! */
+  private static class InitializerImplBNoMemento implements Initializer {
+    private static int instanceCounter = 0;
+
+    @Getter private String state;
+
+    @Override
+    public void initialize() {
+      this.state = "ignore-" + (++instanceCounter);
+    }
+
+    @Override
+    public void close() {
+      // noop
+    }
+  }
+
+
+  /** Concrete Initializer C - implements `AfterInitializeMemento` */
+  private static class InitializerImplC implements Initializer {
+    private static int instanceCounter = 0;
+
+    @Data
+    @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+    @RequiredArgsConstructor
+    private static class MyMemento implements AfterInitializeMemento {
+      @NonNull private int state;
+    }
+
+    @Getter private int state;
+
+    @Override
+    public void initialize() {
+      this.state = 41 + (++instanceCounter);
+    }
+
+    @Override
+    public void close() {
+      // noop
+    }
+
+    @Override
+    public Optional<AfterInitializeMemento> commemorate() {
+      return Optional.of(new MyMemento(this.state));
+    }
+
+    @Override
+    public void recall(AfterInitializeMemento memento) {
+      MyMemento recollection = memento.castAsOrThrow(MyMemento.class, this);
+      this.state = recollection.getState();
+    }
+  }
+
+
+  @Test
+  public void testMementoCommemorateToSerializeAndDeserializeForRecall() {
+    // create "first generation" of concrete initializers
+    InitializerImplA initializerA_1 = new InitializerImplA();
+    InitializerImplBNoMemento initializerB_1 = new InitializerImplBNoMemento();
+    InitializerImplC initializerC_1 = new InitializerImplC();
+
+    // create the 1st-gen `MultiInitializer`; `initialize` all wrapped 
initializers
+    MultiInitializer multiInitializer1G = new 
MultiInitializer(Arrays.asList(initializerA_1, initializerB_1, initializerC_1));
+    multiInitializer1G.initialize();
+
+    // `commemorate` and `serialize` 1st-gen state
+    Optional<Initializer.AfterInitializeMemento> optMemento1G = 
multiInitializer1G.commemorate();
+    Assert.assertTrue(optMemento1G.isPresent());
+    String serializedMemento = 
Initializer.AfterInitializeMemento.serialize(optMemento1G.get());
+
+    // create a new 2nd-gen `MultiInitializer` with a "second generation" of 
concrete initializers... but DO NOT `initialize` them!
+    InitializerImplA initializerA_2 = new InitializerImplA();
+    InitializerImplBNoMemento initializerB_2 = new InitializerImplBNoMemento();
+    InitializerImplC initializerC_2 = new InitializerImplC();
+    MultiInitializer multiInitializer2G = new 
MultiInitializer(Arrays.asList(initializerA_2, initializerB_2, initializerC_2));
+
+    // verify that state differs between 1st-gen and 2nd-gen `Initializer`s
+    Assert.assertNotEquals(initializerA_1.getState(), 
initializerA_2.getState());
+    Assert.assertNotEquals(initializerB_1.getState(), 
initializerB_2.getState());
+    Assert.assertNotEquals(initializerC_1.getState(), 
initializerC_2.getState());
+
+    try {
+      // verify not possible to `commemorate` prior to `recall()`
+      multiInitializer2G.commemorate();
+      Assert.fail("`commemorate()` somehow possible even before 
`Initializer.initialize()` or `recall()` - despite `@NotNull` annotation on 
`state`");
+    } catch (NullPointerException npe) {
+      // expected
+    }
+
+    // next, `deserialize` 1st-gen state and `recall` it to the 
(un-`initialize`d) 2nd-gen `MultiInitializer`
+    Initializer.AfterInitializeMemento deserializedMemento = 
Initializer.AfterInitializeMemento.deserialize(serializedMemento);
+    multiInitializer2G.recall(deserializedMemento);
+    Optional<Initializer.AfterInitializeMemento> optMemento2G = 
multiInitializer2G.commemorate();
+    Assert.assertTrue(optMemento2G.isPresent());
+
+    // verify that post-`recall`ed memento equivalent to post-`initialize`d one
+    Assert.assertEquals(optMemento1G.get(), optMemento2G.get());
+
+    // WARNING: in real code, DO NOT `initialize` following `recall`, as it 
would reset the state of the wrapped initializers, negating the `recall`
+    multiInitializer2G.initialize();
+    Optional<Initializer.AfterInitializeMemento> optMemento2G_alt = 
multiInitializer2G.commemorate();
+    Assert.assertTrue(optMemento2G_alt.isPresent());
+    // verify not simply the case that mementos always equal
+    Assert.assertNotEquals(optMemento2G.get(), optMemento2G_alt.get());
+  }
+}
diff --git a/gobblin-metastore/build.gradle b/gobblin-metastore/build.gradle
index d5b9bc1a6d..063a0c0a53 100644
--- a/gobblin-metastore/build.gradle
+++ b/gobblin-metastore/build.gradle
@@ -45,7 +45,7 @@ dependencies {
     testCompile externalDependency.slf4jToLog4j
 }
 
-// Begin HACK to get around POM being depenendent on the (empty) 
gobblin-rest-api instead of gobblin-rest-api-rest-client
+// Begin HACK to get around POM being dependent on the (empty) 
gobblin-rest-api instead of gobblin-rest-api-rest-client
 def installer = install.repositories.mavenInstaller
 [installer]*.pom*.whenConfigured {pom ->
     pom.dependencies.find {dep -> dep.groupId == project.group && 
dep.artifactId == 'gobblin-rest-api' }.artifactId = 
'gobblin-rest-api-rest-client'
diff --git 
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java
 
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java
index 8226de7698..0f78ff31b6 100644
--- 
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java
+++ 
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java
@@ -17,28 +17,23 @@
 
 package org.apache.gobblin.writer.initializer;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.publisher.JdbcPublisher;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.util.ForkOperatorUtils;
-import org.apache.gobblin.util.jdbc.DataSourceBuilder;
-import org.apache.gobblin.writer.Destination;
-import org.apache.gobblin.writer.Destination.DestinationType;
-import org.apache.gobblin.writer.commands.JdbcWriterCommands;
-import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
-import org.apache.gobblin.source.extractor.JobCommitPolicy;
-
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import javax.sql.DataSource;
 
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.Setter;
 import lombok.ToString;
 
 import org.apache.commons.lang3.StringUtils;
@@ -50,12 +45,37 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.publisher.JdbcPublisher;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.gobblin.util.jdbc.DataSourceBuilder;
+import org.apache.gobblin.writer.Destination;
+import org.apache.gobblin.writer.Destination.DestinationType;
+import org.apache.gobblin.writer.commands.JdbcWriterCommands;
+import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+
 
 /**
  * Initialize for JDBC writer and also performs clean up.
  */
 @ToString
 public class JdbcWriterInitializer implements WriterInitializer {
+  /** Commemorate all and exactly those fields, for which to preserve instance 
state - the name(s) of temporary staging DB tables */
+  @Data
+  @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
+  @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+  @AllArgsConstructor
+  private static class Memento implements AfterInitializeMemento {
+    // NOTE: as this clearly MAY be `null` (see below), DO NOT mark 
`@NonNull`, to avoid:
+    //   userCreatedStagingTable is marked non-null but is null
+    private String userCreatedStagingTable;
+    @NonNull private Set<String> createdStagingTables;
+  }
+
+
   private static final Logger LOG = 
LoggerFactory.getLogger(JdbcWriterInitializer.class);
   private static final String STAGING_TABLE_FORMAT = "stage_%d";
   private static final int NAMING_STAGING_TABLE_TRIAL = 10;
@@ -95,7 +115,7 @@ public class JdbcWriterInitializer implements 
WriterInitializer {
    * Drop table if it's created by this instance.
    * Truncate staging tables passed by user.
    * {@inheritDoc}
-   * @see org.apache.gobblin.Initializer#close()
+   * @see org.apache.gobblin.initializer.Initializer#close()
    */
   @Override
   public void close() {
@@ -200,7 +220,6 @@ public class JdbcWriterInitializer implements 
WriterInitializer {
    * 3.1. Create staging table with unique name.
    * 3.2. Try to drop and recreate the table to confirm if we can drop it 
later.
    * 4. Update Workunit state with staging table information.
-   * @param state
    */
   @Override
   public void initialize() {
@@ -225,7 +244,7 @@ public class JdbcWriterInitializer implements 
WriterInitializer {
         i++;
 
         if (isSkipStaging) {
-          LOG.info("User chose to skip staing table on branch " + 
this.branchId + " workunit " + i);
+          LOG.info("User chose to skip staging table on branch " + 
this.branchId + " workunit " + i);
           wu.setProp(stagingTableKey, publishTable);
 
           if (i == 0) {
@@ -277,6 +296,18 @@ public class JdbcWriterInitializer implements 
WriterInitializer {
     }
   }
 
+  @Override
+  public Optional<AfterInitializeMemento> commemorate() {
+    return Optional.of(new 
JdbcWriterInitializer.Memento(this.userCreatedStagingTable, 
Sets.newHashSet(this.createdStagingTables)));
+  }
+
+  @Override
+  public void recall(AfterInitializeMemento memento) {
+    Memento recollection = 
memento.castAsOrThrow(JdbcWriterInitializer.Memento.class, this);
+    this.userCreatedStagingTable = recollection.userCreatedStagingTable;
+    this.createdStagingTables = 
Sets.newHashSet(recollection.createdStagingTables);
+  }
+
   private JdbcWriterCommands createJdbcWriterCommands(Connection conn) {
     String destKey = 
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_DESTINATION_TYPE_KEY,
         this.branches, this.branchId);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index 6346e08df3..b43c079c10 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -40,7 +40,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-
+import com.google.common.io.Closer;
 import io.temporal.failure.ApplicationFailure;
 
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
@@ -48,6 +48,8 @@ import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
+import org.apache.gobblin.initializer.Initializer;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.JobContext;
@@ -58,6 +60,8 @@ import org.apache.gobblin.runtime.TaskStateCollectorService;
 import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
 import 
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
 import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
@@ -69,6 +73,7 @@ import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 
 
 @Slf4j
@@ -76,7 +81,7 @@ public class CommitActivityImpl implements CommitActivity {
 
   static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
   static int DEFAULT_NUM_COMMIT_THREADS = 1;
-  static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+  static String UNDEFINED_JOB_NAME = "<<UNKNOWN JOB NAME>>";
 
   @Override
   public CommitStats commit(WUProcessingSpec workSpec) {
@@ -105,6 +110,22 @@ public class CommitActivityImpl implements CommitActivity {
       } catch (FailedDatasetUrnsException exception) {
         log.warn("Some datasets failed to be committed, proceeding with 
publishing commit step", exception);
         optFailure = Optional.of(exception);
+      } finally {
+        // if Work Discovery transmitted any writer/converter 
`Initializer.AfterInitializeMemento`s within `jobState`, deserialize them now to
+        // `.recall()` and "re-initialize" equivalent writer and/or 
converter(s) `Initializer`s, to complete their `.close()`
+        // NOTE: the "revived" `Initializer`s are constructed with empty 
placeholder WUs
+        Closer closer = Closer.create(); // (purely to suppress exceptions)
+        
Optional.ofNullable(jobState.getProp(ConfigurationKeys.WRITER_INITIALIZER_SERIALIZED_MEMENTO_KEY)).map(mementoProp
 ->
+            Initializer.AfterInitializeMemento.deserialize(mementoProp)
+        ).ifPresent(memento ->
+            closer.register(WriterInitializerFactory.newInstace(jobState, 
createEmptyWorkUnitStream())).recall(memento)
+        );
+        
Optional.ofNullable(jobState.getProp(ConfigurationKeys.CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY)).map(mementoProp
 ->
+            Initializer.AfterInitializeMemento.deserialize(mementoProp)
+        ).ifPresent(memento ->
+            closer.register(ConverterInitializerFactory.newInstance(jobState, 
createEmptyWorkUnitStream())).recall(memento)
+        );
+        closer.close();
       }
 
       boolean shouldIncludeFailedTasks = 
PropertiesUtils.getPropAsBoolean(jobState.getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
@@ -247,4 +268,8 @@ public class CommitActivityImpl implements CommitActivity {
   private static String calcCommitId(WUProcessingSpec workSpec) {
     return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
   }
-}
\ No newline at end of file
+
+  private static WorkUnitStream createEmptyWorkUnitStream() {
+    return new BasicWorkUnitStream.Builder(Lists.newArrayList()).build();
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 63d9b6b1ea..33f60bd779 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -40,7 +41,9 @@ import io.temporal.failure.ApplicationFailure;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.initializer.ConverterInitializer;
 import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
+import org.apache.gobblin.initializer.Initializer;
 import org.apache.gobblin.destination.DestinationDatasetHandlerService;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
@@ -63,6 +66,7 @@ import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
 import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.writer.initializer.WriterInitializer;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 
 
@@ -106,6 +110,16 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
   }
 
 
+  /** [Internal, impl class] Intermediate result of generated work units with 
insightful analysis extracted by pre-processing them */
+  @Data
+  private static class WorkUnitsWithInsights {
+    private final List<WorkUnit> workUnits;
+    private final Set<String> pathsToCleanUp;
+    private final Optional<Initializer.AfterInitializeMemento> 
optWriterMemento;
+    private final Optional<Initializer.AfterInitializeMemento> 
optConverterMemento;
+  }
+
+
   @Override
   public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, 
EventSubmitterContext eventSubmitterContext) {
     // TODO: decide whether to acquire a job lock (as MR did)!
@@ -128,8 +142,8 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
       FileSystem fs = closer.register(JobStateUtils.openFileSystem(jobState));
       fs.mkdirs(workDirRoot);
 
-      Set<String> pathsToCleanUp = new HashSet<>();
-      List<WorkUnit> workUnits = 
generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, 
eventSubmitterContext, closer, pathsToCleanUp);
+      WorkUnitsWithInsights genWUsInsights = 
generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, 
eventSubmitterContext, closer);
+      List<WorkUnit> workUnits = genWUsInsights.getWorkUnits();
 
       int numSizeSummaryQuantiles = 
getConfiguredNumSizeSummaryQuantiles(jobState);
       WorkUnitsSizeSummary wuSizeSummary = 
digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles);
@@ -138,11 +152,20 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
       // exceed available memory and this activity execution were to fail, a 
subsequent re-attempt would know the amount of work, to guide re-config/attempt
       createWorkPreparedSizeDistillationTimer(wuSizeSummary, 
eventSubmitterContext).stop();
 
+      // add any (serialized) mementos before serializing `jobState`, for 
later `recall` during `CommitActivityImpl`
+      genWUsInsights.optWriterMemento.ifPresent(memento ->
+          
jobState.setProp(ConfigurationKeys.WRITER_INITIALIZER_SERIALIZED_MEMENTO_KEY,
+              Initializer.AfterInitializeMemento.serialize(memento))
+      );
+      genWUsInsights.optConverterMemento.ifPresent(memento ->
+          
jobState.setProp(ConfigurationKeys.CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY,
+              Initializer.AfterInitializeMemento.serialize(memento))
+      );
       JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
       JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: 
the writing of `JobState` after all WUs signifies WU gen+serialization now 
complete
 
       String sourceClassName = JobStateUtils.getSourceClassName(jobState);
-      return new GenerateWorkUnitsResult(jobState.getTaskCount(), 
sourceClassName, wuSizeSummary, pathsToCleanUp);
+      return new GenerateWorkUnitsResult(jobState.getTaskCount(), 
sourceClassName, wuSizeSummary, genWUsInsights.getPathsToCleanUp());
     } catch (ReflectiveOperationException roe) {
       String errMsg = "Unable to construct a source for generating workunits 
for job " + jobState.getJobId();
       log.error(errMsg, roe);
@@ -157,8 +180,7 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     }
   }
 
-  protected List<WorkUnit> 
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, 
EventSubmitterContext eventSubmitterContext, Closer closer,
-      Set<String> pathsToCleanUp)
+  protected WorkUnitsWithInsights 
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, 
EventSubmitterContext eventSubmitterContext, Closer closer)
       throws ReflectiveOperationException {
     // report (timer) metrics for "Work Discovery", *planning only* - NOT 
including WU prep, like serialization, `DestinationDatasetHandlerService`ing, 
etc.
     // IMPORTANT: for accurate timing, SEPARATELY emit 
`.createWorkPreparationTimer`, to record time prior to measuring the WU size 
required for that one
@@ -179,18 +201,20 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
 
     if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run: 
entirely normal result (not a failure)
       log.warn("No work units created for job " + jobState.getJobId());
-      return Lists.newArrayList();
+      return new WorkUnitsWithInsights(Lists.newArrayList(), new HashSet<>(), 
Optional.empty(), Optional.empty());
     }
 
     boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher` 
running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for 
cleanup
     DestinationDatasetHandlerService datasetHandlerService = closer.register(
         new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, 
eventSubmitterContext.create()));
     WorkUnitStream handledWorkUnitStream = 
datasetHandlerService.executeHandlers(workUnitStream);
-    pathsToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
-    // initialize writer and converter(s)
-    // TODO: determine whether registration here is effective, or the 
lifecycle of this activity is too brief (as is likely!)
-    closer.register(WriterInitializerFactory.newInstace(jobState, 
handledWorkUnitStream)).initialize();
-    closer.register(ConverterInitializerFactory.newInstance(jobState, 
handledWorkUnitStream)).initialize();
+    Set<String> pathsToCleanUp = new 
HashSet<>(calculateWorkDirsToCleanup(handledWorkUnitStream));
+
+    // initialize writer and converter(s), but DO NOT `.close()` them here; 
rather `.commemorate()` for later `.recall()` during Commit
+    WriterInitializer writerInitializer = 
WriterInitializerFactory.newInstace(jobState, handledWorkUnitStream);
+    writerInitializer.initialize();
+    ConverterInitializer converterInitializer = 
ConverterInitializerFactory.newInstance(jobState, handledWorkUnitStream);
+    converterInitializer.initialize();
 
     // update jobState before it gets serialized
     long startTime = System.currentTimeMillis();
@@ -206,7 +230,11 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     // dump the work unit if tracking logs are enabled (post any 
materialization for counting)
     WorkUnitStream trackedWorkUnitStream = 
AbstractJobLauncher.addWorkUnitTrackingPerConfig(preparedWorkUnitStream, 
jobState, log);
 
-    return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream);
+    return new WorkUnitsWithInsights(
+        AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream),
+        pathsToCleanUp,
+        writerInitializer.commemorate(),
+        converterInitializer.commemorate());
   }
 
   protected static Set<String> calculateWorkDirsToCleanup(WorkUnitStream 
workUnitStream) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
index 239825f7cf..0bb9192cc6 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java
@@ -17,10 +17,11 @@
 
 package org.apache.gobblin.temporal.util.nesting.work;
 
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import java.util.Iterator;
 import java.util.Optional;
 
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
 
 /**
  * {@link Workload} models a logical collection of homogenous inputs over 
which a "foreach" operation can asynchronously apply


Reply via email to