This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 37556f2  [BEAM-6994] SamzaRunner: further improvements for upgrading 
Samza (#8221)
37556f2 is described below

commit 37556f2cd3ce32e23e16cb653632eafd7169c8e2
Author: xinyuiscool <[email protected]>
AuthorDate: Mon Apr 8 18:19:27 2019 -0700

    [BEAM-6994] SamzaRunner: further improvements for upgrading Samza (#8221)
---
 runners/samza/job-server/build.gradle              | 49 ++++++++++++
 .../samza/SamzaPipelineLifeCycleListener.java      | 44 +++++++++++
 .../beam/runners/samza/SamzaPipelineOptions.java   |  9 +++
 .../beam/runners/samza/SamzaPipelineResult.java    | 12 ++-
 .../org/apache/beam/runners/samza/SamzaRunner.java | 87 +++++++++++++++++++---
 .../runners/samza/adapter/BoundedSourceSystem.java |  4 +-
 .../samza/adapter/UnboundedSourceSystem.java       |  5 +-
 .../apache/beam/runners/samza/runtime/DoFnOp.java  | 12 ++-
 .../beam/runners/samza/runtime/GroupByKeyOp.java   | 14 +++-
 .../runners/samza/runtime/SamzaDoFnRunners.java    |  3 +-
 .../samza/runtime/SamzaStoreStateInternals.java    | 22 +++---
 .../samza/runtime/SamzaTimerInternalsFactory.java  | 18 ++++-
 .../runners/samza/translation/ConfigBuilder.java   | 84 ++++++++++++---------
 .../samza/translation/GroupByKeyTranslator.java    | 33 +++++---
 .../samza/translation/ImpulseTranslator.java       |  7 +-
 .../translation/ParDoBoundMultiTranslator.java     |  9 +++
 .../SamzaPortablePipelineTranslator.java           |  6 +-
 .../translation/TransformConfigGenerator.java      |  4 +-
 .../samza/util/SamzaPipelineTranslatorUtils.java   | 17 +++++
 .../runtime/SamzaTimerInternalsFactoryTest.java    |  4 +-
 settings.gradle                                    |  2 +
 21 files changed, 361 insertions(+), 84 deletions(-)

diff --git a/runners/samza/job-server/build.gradle 
b/runners/samza/job-server/build.gradle
new file mode 100644
index 0000000..a8ab8b9
--- /dev/null
+++ b/runners/samza/job-server/build.gradle
@@ -0,0 +1,49 @@
+import org.apache.beam.gradle.BeamModulePlugin
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'org.apache.beam.module'
+apply plugin: 'application'
+// we need to set mainClassName before applying shadow plugin
+mainClassName = "org.apache.beam.runners.samza.SamzaJobServerDriver"
+
+repositories {
+  maven {
+    url "https://artifactory.corp.linkedin.com:8083/artifactory/DDS/";
+  }
+}
+
+applyJavaNature(
+    validateShadowJar: false,
+    exportJavadoc: false,
+    shadowClosure: {
+      append "reference.conf"
+    },
+)
+
+dependencies {
+  compile project(path: ":beam-runners-samza", configuration: "shadow")
+  compile group: "org.slf4j", name: "jcl-over-slf4j", version: 
dependencies.create(project.library.java.slf4j_api).getVersion()
+  compile library.java.slf4j_simple
+  shadow library.java.guava
+}
+
+runShadow {
+  args = []
+}
\ No newline at end of file
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
new file mode 100644
index 0000000..9585d77
--- /dev/null
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ExternalContext;
+
+/** Life cycle listener for a Samza pipeline during runtime. */
+public interface SamzaPipelineLifeCycleListener {
+  /** Callback when the pipeline options is created. */
+  void onInit(Config config);
+
+  /** Callback when the pipeline is started. */
+  ExternalContext onStart();
+
+  /**
+   * Callback after the pipeline is submmitted. This will be invoked only for 
Samza jobs submitted
+   * to a cluster.
+   */
+  void onSubmit();
+
+  /** Callback after the pipeline is finished. */
+  void onFinish();
+
+  /** A registrar for {@link SamzaPipelineLifeCycleListener}. */
+  interface Registrar {
+    SamzaPipelineLifeCycleListener getLifeCycleListener();
+  }
+}
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
index 10c5351..5ef07cf 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.runners.samza;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.samza.config.ConfigFactory;
 import org.apache.samza.config.factories.PropertiesConfigFactory;
+import org.apache.samza.metrics.MetricsReporter;
 
 /** Options which can be used to configure a Samza PortablePipelineRunner. */
 public interface SamzaPipelineOptions extends PipelineOptions {
@@ -102,4 +105,10 @@ public interface SamzaPipelineOptions extends 
PipelineOptions {
   int getTimerBufferSize();
 
   void setTimerBufferSize(int timerBufferSize);
+
+  @JsonIgnore
+  @Description("The metrics reporters that will be used to emit metrics.")
+  List<MetricsReporter> getMetricsReporters();
+
+  void setMetricsReporters(List<MetricsReporter> reporters);
 }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
index db71dc9..063f0b4 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
@@ -38,12 +38,17 @@ public class SamzaPipelineResult implements PipelineResult {
   private final SamzaExecutionContext executionContext;
   private final ApplicationRunner runner;
   private final StreamApplication app;
+  private final SamzaPipelineLifeCycleListener listener;
 
   public SamzaPipelineResult(
-      StreamApplication app, ApplicationRunner runner, SamzaExecutionContext 
executionContext) {
+      StreamApplication app,
+      ApplicationRunner runner,
+      SamzaExecutionContext executionContext,
+      SamzaPipelineLifeCycleListener listener) {
     this.executionContext = executionContext;
     this.runner = runner;
     this.app = app;
+    this.listener = listener;
   }
 
   @Override
@@ -70,6 +75,11 @@ public class SamzaPipelineResult implements PipelineResult {
     }
 
     final StateInfo stateInfo = getStateInfo();
+
+    if (listener != null && (stateInfo.state == State.DONE || stateInfo.state 
== State.FAILED)) {
+      listener.onFinish();
+    }
+
     if (stateInfo.state == State.FAILED) {
       throw stateInfo.error;
     }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
index 3589edb..36d47a8 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
@@ -17,7 +17,11 @@
  */
 package org.apache.beam.runners.samza;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.ServiceLoader;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.samza.translation.ConfigBuilder;
 import org.apache.beam.runners.samza.translation.PViewToIdMapper;
@@ -32,7 +36,12 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsReporterFactory;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
 import org.slf4j.Logger;
@@ -51,14 +60,27 @@ public class SamzaRunner extends 
PipelineRunner<SamzaPipelineResult> {
   }
 
   private final SamzaPipelineOptions options;
+  private final SamzaPipelineLifeCycleListener listener;
 
   private SamzaRunner(SamzaPipelineOptions options) {
     this.options = options;
+    final Iterator<SamzaPipelineLifeCycleListener.Registrar> listenerReg =
+        
ServiceLoader.load(SamzaPipelineLifeCycleListener.Registrar.class).iterator();
+    this.listener =
+        listenerReg.hasNext() ? 
Iterators.getOnlyElement(listenerReg).getLifeCycleListener() : null;
   }
 
-  SamzaPipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) {
-    ConfigBuilder configBuilder = new ConfigBuilder(options);
-    SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder);
+  public SamzaPipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) {
+    final ConfigBuilder configBuilder = new ConfigBuilder(options);
+    SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder, 
options);
+
+    final Config config = configBuilder.build();
+    options.setConfigOverride(config);
+
+    if (listener != null) {
+      listener.onInit(config);
+    }
+
     final SamzaExecutionContext executionContext = new 
SamzaExecutionContext(options);
     final StreamApplication app =
         appDescriptor -> {
@@ -66,11 +88,8 @@ public class SamzaRunner extends 
PipelineRunner<SamzaPipelineResult> {
           SamzaPortablePipelineTranslator.translate(
               pipeline, new PortableTranslationContext(appDescriptor, 
options));
         };
-    final ApplicationRunner runner =
-        ApplicationRunners.getApplicationRunner(app, configBuilder.build());
-    final SamzaPipelineResult result = new SamzaPipelineResult(app, runner, 
executionContext);
-    runner.run();
-    return result;
+
+    return runSamzaApp(app, config, executionContext);
   }
 
   @Override
@@ -91,19 +110,63 @@ public class SamzaRunner extends 
PipelineRunner<SamzaPipelineResult> {
 
     final ConfigBuilder configBuilder = new ConfigBuilder(options);
     SamzaPipelineTranslator.createConfig(pipeline, options, idMap, 
configBuilder);
+
+    final Config config = configBuilder.build();
+    options.setConfigOverride(config);
+
+    if (listener != null) {
+      listener.onInit(config);
+    }
+
     final SamzaExecutionContext executionContext = new 
SamzaExecutionContext(options);
+    final Map<String, MetricsReporterFactory> reporterFactories = 
getMetricsReporters();
 
     final StreamApplication app =
         appDescriptor -> {
           
appDescriptor.withApplicationContainerContextFactory(executionContext.new 
Factory());
+          appDescriptor.withMetricsReporterFactories(reporterFactories);
+
           SamzaPipelineTranslator.translate(
               pipeline, new TranslationContext(appDescriptor, idMap, options));
         };
 
-    final ApplicationRunner runner =
-        ApplicationRunners.getApplicationRunner(app, configBuilder.build());
-    final SamzaPipelineResult result = new SamzaPipelineResult(app, runner, 
executionContext);
-    runner.run();
+    return runSamzaApp(app, config, executionContext);
+  }
+
+  private Map<String, MetricsReporterFactory> getMetricsReporters() {
+    if (options.getMetricsReporters() != null) {
+      final Map<String, MetricsReporterFactory> reporters = new HashMap<>();
+      for (int i = 0; i < options.getMetricsReporters().size(); i++) {
+        final String name = "beam-metrics-reporter-" + i;
+        final MetricsReporter reporter = options.getMetricsReporters().get(i);
+
+        reporters.put(name, (MetricsReporterFactory) (nm, processorId, config) 
-> reporter);
+      }
+      return reporters;
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  private SamzaPipelineResult runSamzaApp(
+      StreamApplication app, Config config, SamzaExecutionContext 
executionContext) {
+
+    final ApplicationRunner runner = 
ApplicationRunners.getApplicationRunner(app, config);
+    final SamzaPipelineResult result =
+        new SamzaPipelineResult(app, runner, executionContext, listener);
+
+    ExternalContext externalContext = null;
+    if (listener != null) {
+      externalContext = listener.onStart();
+    }
+
+    runner.run(externalContext);
+
+    if (listener != null
+        && options.getSamzaExecutionEnvironment() == 
SamzaExecutionEnvironment.YARN) {
+      listener.onSubmit();
+    }
+
     return result;
   }
 }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
index dfcea8b..1d776b8 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
@@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory;
  */
 // TODO: instrumentation for the consumer
 public class BoundedSourceSystem {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BoundedSourceSystem.class);
 
   private static <T> List<BoundedSource<T>> split(
       BoundedSource<T> source, SamzaPipelineOptions pipelineOptions) throws 
Exception {
@@ -417,7 +418,8 @@ public class BoundedSourceSystem {
 
     @Override
     public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
-      throw new UnsupportedOperationException("Cannot create a producer for an 
input system");
+      LOG.info("System " + systemName + " does not have producer.");
+      return null;
     }
 
     @Override
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
index a3f958b..f5066cd 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
@@ -71,6 +71,8 @@ import org.slf4j.LoggerFactory;
  * into partitions. Samza creates the job model by assigning partitions to 
Samza tasks.
  */
 public class UnboundedSourceSystem {
+  private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedSourceSystem.class);
+
   // A dummy message used to force the consumer to wake up immediately and 
check the
   // lastException field, which will be populated.
   private static final IncomingMessageEnvelope CHECK_LAST_EXCEPTION_ENVELOPE =
@@ -460,7 +462,8 @@ public class UnboundedSourceSystem {
 
     @Override
     public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
-      throw new UnsupportedOperationException("Cannot create a producer for an 
input system");
+      LOG.info("System " + systemName + " does not have producer.");
+      return null;
     }
 
     @Override
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index eb1d997..d986b15 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -74,8 +75,10 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, 
OutT, Void> {
   // NOTE: we use HashMap here to guarantee Serializability
   private final HashMap<String, PCollectionView<?>> idToViewMap;
   private final String stepName;
+  private final String stepId;
   private final Coder<InT> inputCoder;
   private final HashMap<TupleTag<?>, Coder<?>> outputCoders;
+  private final PCollection.IsBounded isBounded;
 
   // portable api related
   private final boolean isPortable;
@@ -114,6 +117,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, 
OutT, Void> {
       Map<String, PCollectionView<?>> idToViewMap,
       OutputManagerFactory<OutT> outputManagerFactory,
       String stepName,
+      String stepId,
+      PCollection.IsBounded isBounded,
       boolean isPortable,
       RunnerApi.ExecutableStagePayload stagePayload,
       Map<String, TupleTag<?>> idToTupleTagMap,
@@ -128,7 +133,9 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, 
OutT, Void> {
     this.idToViewMap = new HashMap<>(idToViewMap);
     this.outputManagerFactory = outputManagerFactory;
     this.stepName = stepName;
+    this.stepId = stepId;
     this.keyCoder = keyCoder;
+    this.isBounded = isBounded;
     this.isPortable = isPortable;
     this.stagePayload = stagePayload;
     this.idToTupleTagMap = new HashMap<>(idToTupleTagMap);
@@ -152,9 +159,10 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, 
OutT, Void> {
             .get()
             .as(SamzaPipelineOptions.class);
 
+    final String stateId = "pardo-" + stepId;
     final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
         SamzaStoreStateInternals.createStateInternalFactory(
-            null, context.getTaskContext(), pipelineOptions, signature, 
mainOutputTag);
+            stateId, null, context.getTaskContext(), pipelineOptions, 
signature);
 
     this.timerInternalsFactory =
         SamzaTimerInternalsFactory.createTimerInternalFactory(
@@ -163,6 +171,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, 
OutT, Void> {
             getTimerStateId(signature),
             nonKeyedStateInternalsFactory,
             windowingStrategy,
+            isBounded,
             pipelineOptions);
 
     this.sideInputHandler =
@@ -188,6 +197,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, 
OutT, Void> {
               doFn,
               windowingStrategy,
               stepName,
+              stateId,
               context,
               mainOutputTag,
               sideInputHandler,
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
index f593ecc..4748ebb 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.samza.config.Config;
@@ -65,6 +66,8 @@ public class GroupByKeyOp<K, InputT, OutputT>
   private final Coder<K> keyCoder;
   private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn;
   private final String stepName;
+  private final String stepId;
+  private final IsBounded isBounded;
 
   private transient StateInternalsFactory<K> stateInternalsFactory;
   private transient SamzaTimerInternalsFactory<K> timerInternalsFactory;
@@ -77,11 +80,15 @@ public class GroupByKeyOp<K, InputT, OutputT>
       SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn,
       WindowingStrategy<?, BoundedWindow> windowingStrategy,
       OutputManagerFactory<KV<K, OutputT>> outputManagerFactory,
-      String stepName) {
+      String stepName,
+      String stepId,
+      IsBounded isBounded) {
     this.mainOutputTag = mainOutputTag;
     this.windowingStrategy = windowingStrategy;
     this.outputManagerFactory = outputManagerFactory;
     this.stepName = stepName;
+    this.stepId = stepId;
+    this.isBounded = isBounded;
 
     if (!(inputCoder instanceof KeyedWorkItemCoder)) {
       throw new IllegalArgumentException(
@@ -108,13 +115,13 @@ public class GroupByKeyOp<K, InputT, OutputT>
 
     final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
         SamzaStoreStateInternals.createStateInternalFactory(
-            null, context.getTaskContext(), pipelineOptions, null, 
mainOutputTag);
+            stepId, null, context.getTaskContext(), pipelineOptions, null);
 
     final DoFnRunners.OutputManager outputManager = 
outputManagerFactory.create(emitter);
 
     this.stateInternalsFactory =
         new SamzaStoreStateInternals.Factory<>(
-            mainOutputTag.getId(),
+            stepId,
             Collections.singletonMap(
                 SamzaStoreStateInternals.BEAM_STORE,
                 
SamzaStoreStateInternals.getBeamStore(context.getTaskContext())),
@@ -128,6 +135,7 @@ public class GroupByKeyOp<K, InputT, OutputT>
             TIMER_STATE_ID,
             nonKeyedStateInternalsFactory,
             windowingStrategy,
+            isBounded,
             pipelineOptions);
 
     final DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 692b664..0aa95a8 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -61,6 +61,7 @@ public class SamzaDoFnRunners {
       DoFn<InT, FnOutT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       String stepName,
+      String stateId,
       Context context,
       TupleTag<FnOutT> mainOutputTag,
       SideInputHandler sideInputHandler,
@@ -77,7 +78,7 @@ public class SamzaDoFnRunners {
     final DoFnSignature signature = 
DoFnSignatures.getSignature(doFn.getClass());
     final SamzaStoreStateInternals.Factory<?> stateInternalsFactory =
         SamzaStoreStateInternals.createStateInternalFactory(
-            keyCoder, context.getTaskContext(), pipelineOptions, signature, 
mainOutputTag);
+            stateId, keyCoder, context.getTaskContext(), pipelineOptions, 
signature);
 
     final SamzaExecutionContext executionContext =
         (SamzaExecutionContext) context.getApplicationContainerContext();
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 620aae2..5b266bf 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateInternals;
@@ -60,7 +61,6 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.values.TupleTag;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.Ints;
@@ -82,16 +82,19 @@ public class SamzaStoreStateInternals<K> implements 
StateInternals {
   private final K key;
   private final byte[] keyBytes;
   private final int batchGetSize;
+  private final String stageId;
 
   private SamzaStoreStateInternals(
       Map<String, KeyValueStore<byte[], byte[]>> stores,
       @Nullable K key,
       @Nullable byte[] keyBytes,
+      String stageId,
       int batchGetSize) {
     this.stores = stores;
     this.key = key;
     this.keyBytes = keyBytes;
     this.batchGetSize = batchGetSize;
+    this.stageId = stageId;
   }
 
   @SuppressWarnings("unchecked")
@@ -100,11 +103,11 @@ public class SamzaStoreStateInternals<K> implements 
StateInternals {
   }
 
   static Factory createStateInternalFactory(
+      String id,
       Coder<?> keyCoder,
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
-      DoFnSignature signature,
-      TupleTag<?> mainOutputTag) {
+      DoFnSignature signature) {
     final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
     final Map<String, KeyValueStore<byte[], byte[]>> stores = new HashMap<>();
     stores.put(BEAM_STORE, getBeamStore(context));
@@ -121,12 +124,7 @@ public class SamzaStoreStateInternals<K> implements 
StateInternals {
     } else {
       stateKeyCoder = VoidCoder.of();
     }
-    return new Factory<>(
-        // TODO: ??? what to do with empty output?
-        mainOutputTag == null ? "null" : mainOutputTag.getId(),
-        stores,
-        stateKeyCoder,
-        batchGetSize);
+    return new Factory<>(Objects.toString(id), stores, stateKeyCoder, 
batchGetSize);
   }
 
   @Override
@@ -231,7 +229,6 @@ public class SamzaStoreStateInternals<K> implements 
StateInternals {
       final DataOutputStream dos = new DataOutputStream(baos);
 
       try {
-        dos.writeUTF(stageId);
         if (key != null) {
           keyCoder.encode(key, baos);
         }
@@ -244,7 +241,7 @@ public class SamzaStoreStateInternals<K> implements 
StateInternals {
         throw new RuntimeException("Cannot encode key for state store", e);
       }
 
-      return new SamzaStoreStateInternals<>(stores, key, baos.toByteArray(), 
batchGetSize);
+      return new SamzaStoreStateInternals<>(stores, key, baos.toByteArray(), 
stageId, batchGetSize);
     }
   }
 
@@ -273,7 +270,8 @@ public class SamzaStoreStateInternals<K> implements 
StateInternals {
         dos.writeUTF(namespace.stringKey());
 
         if (userStore == null) {
-          // for system state, we need to differentiate based on the address
+          // for system state, we need to differentiate based on the following:
+          dos.writeUTF(stageId);
           dos.writeUTF(address.getId());
         }
       } catch (IOException e) {
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index d014a1e..146a9a8 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -36,6 +36,8 @@ import org.apache.beam.runners.samza.state.SamzaSetState;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.samza.operators.Scheduler;
 import org.joda.time.Instant;
@@ -55,6 +57,7 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
   private final Scheduler<KeyedTimerData<K>> timerRegistry;
   private final int timerBufferSize;
   private final SamzaTimerState state;
+  private final IsBounded isBounded;
 
   private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
   private Instant outputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -65,12 +68,14 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
       int timerBufferSize,
       String timerStateId,
       SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
-      Coder<BoundedWindow> windowCoder) {
+      Coder<BoundedWindow> windowCoder,
+      IsBounded isBounded) {
     this.keyCoder = keyCoder;
     this.timerRegistry = timerRegistry;
     this.timerBufferSize = timerBufferSize;
     this.eventTimeTimers = new TreeSet<>();
     this.state = new SamzaTimerState(timerStateId, 
nonKeyedStateInternalsFactory, windowCoder);
+    this.isBounded = isBounded;
   }
 
   static <K> SamzaTimerInternalsFactory<K> createTimerInternalFactory(
@@ -79,6 +84,7 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
       String timerStateId,
       SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
       WindowingStrategy<?, BoundedWindow> windowingStrategy,
+      IsBounded isBounded,
       SamzaPipelineOptions pipelineOptions) {
 
     final Coder<BoundedWindow> windowCoder = 
windowingStrategy.getWindowFn().windowCoder();
@@ -89,7 +95,8 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
         pipelineOptions.getTimerBufferSize(),
         timerStateId,
         nonKeyedStateInternalsFactory,
-        windowCoder);
+        windowCoder,
+        isBounded);
   }
 
   @Override
@@ -183,6 +190,13 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
 
     @Override
     public void setTimer(TimerData timerData) {
+      if (isBounded == IsBounded.UNBOUNDED
+          && timerData.getTimestamp().getMillis()
+              >= GlobalWindow.INSTANCE.maxTimestamp().getMillis()) {
+        // No need to register a timer of max timestamp if the input is 
unbounded
+        return;
+      }
+
       final KeyedTimerData<K> keyedTimerData = new KeyedTimerData<>(keyBytes, 
key, timerData);
 
       // persist it first
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
index 5d90009..42a0d97 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
@@ -28,7 +28,7 @@ import 
org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.samza.SamzaExecutionEnvironment;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
-import org.apache.beam.runners.samza.SamzaRunnerOverrideConfigs;
+import org.apache.beam.runners.samza.container.BeamContainerRunner;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.StringUtils;
@@ -48,9 +48,13 @@ import org.apache.samza.runtime.RemoteApplicationRunner;
 import org.apache.samza.serializers.ByteSerdeFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.zk.ZkJobCoordinatorFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Builder class to generate configs for BEAM samza runner during runtime. */
 public class ConfigBuilder {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConfigBuilder.class);
+
   private static final String APP_RUNNER_CLASS = "app.runner.class";
   private static final String YARN_PACKAGE_PATH = "yarn.package.path";
   private static final String JOB_FACTORY_CLASS = "job.factory.class";
@@ -72,7 +76,8 @@ public class ConfigBuilder {
 
   public Config build() {
     try {
-      config.putAll(systemStoreConfig(options));
+      // apply framework configs
+      config.putAll(createSystemConfig(options));
 
       // apply user configs
       config.putAll(createUserConfig(options));
@@ -84,8 +89,7 @@ public class ConfigBuilder {
           "beamPipelineOptions",
           Base64Serializer.serializeUnchecked(new 
SerializablePipelineOptions(options)));
 
-      // TODO: remove after we sort out Samza task wrapper
-      config.put("samza.li.task.wrapper.enabled", "false");
+      validateConfigs(options, config);
 
       return new MapConfig(config);
     } catch (Exception e) {
@@ -93,14 +97,6 @@ public class ConfigBuilder {
     }
   }
 
-  private static boolean isEmptyUserConfig(Map<String, String> config) {
-    if (config == null) {
-      return true;
-    }
-    return config.keySet().stream()
-        .allMatch(key -> 
key.startsWith(SamzaRunnerOverrideConfigs.BEAM_RUNNER_CONFIG_PREFIX));
-  }
-
   private static Map<String, String> createUserConfig(SamzaPipelineOptions 
options)
       throws Exception {
     final Map<String, String> config = new HashMap<>();
@@ -110,11 +106,15 @@ public class ConfigBuilder {
 
     // If user provides a config file, use it as base configs.
     if (StringUtils.isNoneEmpty(configFilePath)) {
+      LOG.info("configFilePath: " + configFilePath);
+
       final File configFile = new File(configFilePath);
       final URI configUri = configFile.toURI();
       final ConfigFactory configFactory =
           options.getConfigFactory().getDeclaredConstructor().newInstance();
 
+      LOG.info("configFactory: " + configFactory.getClass().getName());
+
       // Config file must exist for default properties config
       // TODO: add check to all non-empty files once we don't need to
       // pass the command-line args through the containers
@@ -129,19 +129,6 @@ public class ConfigBuilder {
       config.putAll(options.getConfigOverride());
     }
 
-    switch (options.getSamzaExecutionEnvironment()) {
-      case YARN:
-        config.putAll(yarnRunConfig());
-        validateYarnRun(config);
-        break;
-      case STANDALONE:
-        config.putAll(standAloneRunConfig());
-        validateZKStandAloneRun(config);
-        break;
-      default: // LOCAL
-        config.putAll(localRunConfig());
-    }
-
     return config;
   }
 
@@ -183,9 +170,11 @@ public class ConfigBuilder {
         "Config %s not found for %s Deployment",
         YARN_PACKAGE_PATH,
         SamzaExecutionEnvironment.YARN);
+    final String appRunner = config.get(APP_RUNNER_CLASS);
     checkArgument(
-        !config.containsKey(APP_RUNNER_CLASS)
-            || 
config.get(APP_RUNNER_CLASS).equals(RemoteApplicationRunner.class.getName()),
+        appRunner == null
+            || RemoteApplicationRunner.class.getName().equals(appRunner)
+            || BeamContainerRunner.class.getName().equals(appRunner),
         "Config %s must be set to %s for %s Deployment",
         APP_RUNNER_CLASS,
         RemoteApplicationRunner.class.getName(),
@@ -195,12 +184,6 @@ public class ConfigBuilder {
         "Config %s not found for %s Deployment",
         JOB_FACTORY_CLASS,
         SamzaExecutionEnvironment.YARN);
-    checkArgument(
-        config.get(JOB_FACTORY_CLASS).equals(YarnJobFactory.class.getName()),
-        "Config %s must be set to %s for %s Deployment",
-        JOB_FACTORY_CLASS,
-        YarnJobFactory.class.getName(),
-        SamzaExecutionEnvironment.STANDALONE);
   }
 
   @VisibleForTesting
@@ -240,7 +223,7 @@ public class ConfigBuilder {
         .build();
   }
 
-  private static Map<String, String> systemStoreConfig(SamzaPipelineOptions 
options) {
+  private static Map<String, String> createSystemConfig(SamzaPipelineOptions 
options) {
     ImmutableMap.Builder<String, String> configBuilder =
         ImmutableMap.<String, String>builder()
             .put(
@@ -251,13 +234,46 @@ public class ConfigBuilder {
             .put("serializers.registry.byteSerde.class", 
ByteSerdeFactory.class.getName());
 
     if (options.getStateDurable()) {
+      LOG.info("stateDurable is enabled");
       configBuilder.put("stores.beamStore.changelog", 
getChangelogTopic(options, "beamStore"));
       configBuilder.put("job.host-affinity.enabled", "true");
     }
 
+    LOG.info("Execution environment is " + 
options.getSamzaExecutionEnvironment());
+    switch (options.getSamzaExecutionEnvironment()) {
+      case YARN:
+        configBuilder.putAll(yarnRunConfig());
+        break;
+      case STANDALONE:
+        configBuilder.putAll(standAloneRunConfig());
+        break;
+      default: // LOCAL
+        configBuilder.putAll(localRunConfig());
+        break;
+    }
+
+    // TODO: remove after we sort out Samza task wrapper
+    configBuilder.put("samza.li.task.wrapper.enabled", "false");
+
     return configBuilder.build();
   }
 
+  private static void validateConfigs(SamzaPipelineOptions options, 
Map<String, String> config) {
+
+    // validate execution environment
+    switch (options.getSamzaExecutionEnvironment()) {
+      case YARN:
+        validateYarnRun(config);
+        break;
+      case STANDALONE:
+        validateZKStandAloneRun(config);
+        break;
+      default:
+        // do nothing
+        break;
+    }
+  }
+
   static String getChangelogTopic(SamzaPipelineOptions options, String 
storeName) {
     return String.format(
         "%s-%s-%s-changelog", options.getJobName(), options.getJobInstance(), 
storeName);
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
index 13c6f11..0af0df9 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.samza.translation;
 
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.SystemReduceFn;
@@ -83,7 +84,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
     final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn =
         getSystemReduceFn(transform, input.getPipeline(), kvInputCoder);
 
-    MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
+    final MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
         doTranslateGBK(
             inputStream,
             needRepartition(node, ctx),
@@ -93,7 +94,8 @@ class GroupByKeyTranslator<K, InputT, OutputT>
             elementCoder,
             ctx.getCurrentTopologicalId(),
             node.getFullName(),
-            outputTag);
+            outputTag,
+            input.isBounded());
 
     ctx.registerMessageStream(output, outputStream);
   }
@@ -114,12 +116,12 @@ class GroupByKeyTranslator<K, InputT, OutputT>
         ctx.getOneInputMessageStream(transform);
     final boolean needRepartition = 
ctx.getSamzaPipelineOptions().getMaxSourceParallelism() > 1;
     final WindowingStrategy<?, BoundedWindow> windowingStrategy =
-        SamzaPipelineTranslatorUtils.getPortableWindowStrategy(transform, 
pipeline);
-    Coder<BoundedWindow> windowCoder = 
windowingStrategy.getWindowFn().windowCoder();
+        ctx.getPortableWindowStrategy(transform, pipeline);
+    final Coder<BoundedWindow> windowCoder = 
windowingStrategy.getWindowFn().windowCoder();
 
-    String inputId = ctx.getInputId(transform);
-    WindowedValue.WindowedValueCoder<KV<K, InputT>> windowedInputCoder =
-        SamzaPipelineTranslatorUtils.instantiateCoder(inputId, 
pipeline.getComponents());
+    final String inputId = ctx.getInputId(transform);
+    final WindowedValue.WindowedValueCoder<KV<K, InputT>> windowedInputCoder =
+        ctx.instantiateCoder(inputId, pipeline.getComponents());
     final KvCoder<K, InputT> kvInputCoder = (KvCoder<K, InputT>) 
windowedInputCoder.getValueCoder();
     final Coder<WindowedValue<KV<K, InputT>>> elementCoder =
         WindowedValue.FullWindowedValueCoder.of(kvInputCoder, windowCoder);
@@ -134,7 +136,10 @@ class GroupByKeyTranslator<K, InputT, OutputT>
         (SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow>)
             SystemReduceFn.buffering(kvInputCoder.getValueCoder());
 
-    MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
+    final RunnerApi.PCollection input = 
pipeline.getComponents().getPcollectionsOrThrow(inputId);
+    final PCollection.IsBounded isBounded = 
SamzaPipelineTranslatorUtils.isBounded(input);
+
+    final MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
         doTranslateGBK(
             inputStream,
             needRepartition,
@@ -144,7 +149,8 @@ class GroupByKeyTranslator<K, InputT, OutputT>
             elementCoder,
             topologyId,
             nodeFullname,
-            outputTag);
+            outputTag,
+            isBounded);
     ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
   }
 
@@ -157,7 +163,8 @@ class GroupByKeyTranslator<K, InputT, OutputT>
       Coder<WindowedValue<KV<K, InputT>>> elementCoder,
       int topologyId,
       String nodeFullname,
-      TupleTag<KV<K, OutputT>> outputTag) {
+      TupleTag<KV<K, OutputT>> outputTag,
+      PCollection.IsBounded isBounded) {
     final MessageStream<OpMessage<KV<K, InputT>>> filteredInputStream =
         inputStream.filter(msg -> msg.getType() == OpMessage.Type.ELEMENT);
 
@@ -173,6 +180,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
                   KVSerde.of(
                       SamzaCoders.toSerde(kvInputCoder.getKeyCoder()),
                       SamzaCoders.toSerde(elementCoder)),
+                  // TODO: infer a fixed id from the name
                   "gbk-" + topologyId)
               .map(kv -> OpMessage.ofElement(kv.getValue()));
     }
@@ -194,7 +202,10 @@ class GroupByKeyTranslator<K, InputT, OutputT>
                         reduceFn,
                         windowingStrategy,
                         new DoFnOp.SingleOutputManagerFactory<>(),
-                        nodeFullname)));
+                        nodeFullname,
+                        // TODO: infer a fixed id from the name
+                        outputTag.getId(),
+                        isBounded)));
     return outputStream;
   }
 
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
index baa51ab..2ccbd6c 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
@@ -21,7 +21,9 @@ import 
org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
 import org.apache.beam.runners.samza.runtime.OpMessage;
 import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.system.descriptors.GenericSystemDescriptor;
 
@@ -40,8 +42,11 @@ public class ImpulseTranslator implements 
TransformTranslator {
     final String outputId = ctx.getOutputId(transform);
     final GenericSystemDescriptor systemDescriptor =
         new GenericSystemDescriptor(outputId, 
SamzaImpulseSystemFactory.class.getName());
+
+    // The KvCoder is needed here for Samza not to crop the key.
+    final Serde<KV<?, OpMessage<byte[]>>> kvSerde = KVSerde.of(new 
NoOpSerde(), new NoOpSerde<>());
     final GenericInputDescriptor<KV<?, OpMessage<byte[]>>> inputDescriptor =
-        systemDescriptor.getInputDescriptor(outputId, new NoOpSerde<>());
+        systemDescriptor.getInputDescriptor(outputId, kvSerde);
 
     ctx.registerInputMessageStream(outputId, inputDescriptor);
   }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index e4651c4..ad699af 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -148,6 +148,9 @@ class ParDoBoundMultiTranslator<InT, OutT>
             idToPValueMap,
             new DoFnOp.MultiOutputManagerFactory(tagToIndexMap),
             node.getFullName(),
+            // TODO: infer a fixed id from the name
+            String.valueOf(ctx.getCurrentTopologicalId()),
+            input.isBounded(),
             false,
             null,
             Collections.emptyMap(),
@@ -237,6 +240,9 @@ class ParDoBoundMultiTranslator<InT, OutT>
     final DoFnSchemaInformation doFnSchemaInformation;
     doFnSchemaInformation = 
ParDoTranslation.getSchemaInformation(transform.getTransform());
 
+    final RunnerApi.PCollection input = 
pipeline.getComponents().getPcollectionsOrThrow(inputId);
+    final PCollection.IsBounded isBounded = 
SamzaPipelineTranslatorUtils.isBounded(input);
+
     final DoFnOp<InT, OutT, RawUnionValue> op =
         new DoFnOp<>(
             mainOutputTag,
@@ -250,6 +256,9 @@ class ParDoBoundMultiTranslator<InT, OutT>
             Collections.emptyMap(), // idToViewMap not in use until side input 
support
             new DoFnOp.MultiOutputManagerFactory(tagToIndexMap),
             nodeFullname,
+            // TODO: infer a fixed id from the name
+            String.valueOf(ctx.getCurrentTopologicalId()),
+            isBounded,
             true,
             stagePayload,
             idToTupleTagMap,
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
index 1f18a75..5974cb6 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
@@ -23,6 +23,7 @@ import java.util.ServiceLoader;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,7 +63,8 @@ public class SamzaPortablePipelineTranslator {
     }
   }
 
-  public static void createConfig(RunnerApi.Pipeline pipeline, ConfigBuilder 
configBuilder) {
+  public static void createConfig(
+      RunnerApi.Pipeline pipeline, ConfigBuilder configBuilder, 
SamzaPipelineOptions options) {
     QueryablePipeline queryablePipeline =
         QueryablePipeline.forTransforms(
             pipeline.getRootTransformIdsList(), pipeline.getComponents());
@@ -72,7 +74,7 @@ public class SamzaPortablePipelineTranslator {
           TRANSLATORS.get(transform.getTransform().getSpec().getUrn());
       if (translator instanceof TransformConfigGenerator) {
         TransformConfigGenerator configGenerator = (TransformConfigGenerator) 
translator;
-        configBuilder.putAll(configGenerator.createPortableConfig(transform));
+        configBuilder.putAll(configGenerator.createPortableConfig(transform, 
options));
       }
     }
   }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformConfigGenerator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformConfigGenerator.java
index 7b03bb0..b6ac2ba 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformConfigGenerator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformConfigGenerator.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.samza.translation;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 
@@ -32,7 +33,8 @@ public interface TransformConfigGenerator<T extends 
PTransform<?, ?>> {
   }
 
   /** Generate config for portable api PTransform. */
-  default Map<String, String> createPortableConfig(PipelineNode.PTransformNode 
transform) {
+  default Map<String, String> createPortableConfig(
+      PipelineNode.PTransformNode transform, SamzaPipelineOptions options) {
     return Collections.emptyMap();
   }
 }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java
index 781ca68..50cccfe 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java
@@ -26,6 +26,7 @@ import 
org.apache.beam.runners.core.construction.graph.QueryablePipeline;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
@@ -74,4 +75,20 @@ public final class SamzaPipelineTranslatorUtils {
         (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
     return ret;
   }
+
+  /**
+   * Escape the non-alphabet chars in the name so we can create a physical 
stream out of it.
+   *
+   * <p>This escape will replace ".", "(" and "/" as "-", and then remove all 
the other
+   * non-alphabetic characters.
+   */
+  public static String escape(String name) {
+    return name.replaceAll("[\\.(/]", "-").replaceAll("[^A-Za-z0-9-_]", "");
+  }
+
+  public static PCollection.IsBounded isBounded(RunnerApi.PCollection 
pCollection) {
+    return pCollection.getIsBounded() == RunnerApi.IsBounded.Enum.BOUNDED
+        ? PCollection.IsBounded.BOUNDED
+        : PCollection.IsBounded.UNBOUNDED;
+  }
 }
diff --git 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index 25ef315..9328676 100644
--- 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++ 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.samza.config.MapConfig;
@@ -78,7 +79,7 @@ public class SamzaTimerInternalsFactoryTest {
     final TupleTag<?> mainOutputTag = new TupleTag<>("output");
 
     return SamzaStoreStateInternals.createStateInternalFactory(
-        null, context, pipelineOptions, null, mainOutputTag);
+        "42", null, context, pipelineOptions, null);
   }
 
   private static SamzaTimerInternalsFactory<String> 
createTimerInternalsFactory(
@@ -96,6 +97,7 @@ public class SamzaTimerInternalsFactoryTest {
         timerStateId,
         nonKeyedStateInternalsFactory,
         (WindowingStrategy) WindowingStrategy.globalDefault(),
+        PCollection.IsBounded.BOUNDED,
         pipelineOptions);
   }
 
diff --git a/settings.gradle b/settings.gradle
index c453272..daeb827 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -81,6 +81,8 @@ include "beam-runners-spark"
 project(":beam-runners-spark").dir = file("runners/spark")
 include "beam-runners-samza"
 project(":beam-runners-samza").dir = file("runners/samza")
+include "beam-runners-samza-job-server"
+project(":beam-runners-samza-job-server").dir = 
file("runners/samza/job-server")
 include "beam-sdks-go"
 project(":beam-sdks-go").dir = file("sdks/go")
 include "beam-sdks-go-container"

Reply via email to