http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java
deleted file mode 100644
index b4597f9..0000000
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.stateful;
-
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsTest;
-import org.junit.Ignore;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link SparkStateInternals}. This is based on {@link 
StateInternalsTest}.
- * Ignore set and map tests.
- */
-@RunWith(JUnit4.class)
-public class SparkStateInternalsTest extends StateInternalsTest {
-
-  @Override
-  protected StateInternals createStateInternals() {
-    return SparkStateInternals.forKey("dummyKey");
-  }
-
-  @Override
-  @Ignore
-  public void testSet() {}
-
-  @Override
-  @Ignore
-  public void testSetIsEmpty() {}
-
-  @Override
-  @Ignore
-  public void testMergeSetIntoSource() {}
-
-  @Override
-  @Ignore
-  public void testMergeSetIntoNewNamespace() {}
-
-  @Override
-  @Ignore
-  public void testMap() {}
-
-  @Override
-  @Ignore
-  public void testSetReadable() {}
-
-  @Override
-  @Ignore
-  public void testMapReadable() {}
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
index 8bd6dae..8f2e681 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -58,12 +58,12 @@ public class StorageLevelTest {
 
   @Test
   public void test() throws Exception {
-    PCollection<String> pCollection = pipeline.apply("CreateFoo", 
Create.of("foo"));
+    PCollection<String> pCollection = pipeline.apply(Create.of("foo"));
 
     // by default, the Spark runner doesn't cache the RDD if it accessed only 
one time.
     // So, to "force" the caching of the RDD, we have to call the RDD at least 
two time.
     // That's why we are using Count fn on the PCollection.
-    pCollection.apply("CountAll", Count.<String>globally());
+    pCollection.apply(Count.<String>globally());
 
     PCollection<String> output = pCollection.apply(new 
StorageLevelPTransform());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/common/fn-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml
index 6810667..77a9ba5 100644
--- a/sdks/common/fn-api/pom.xml
+++ b/sdks/common/fn-api/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-common-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto 
b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 9da5afe..9fe2b2f 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -38,6 +38,7 @@ option java_package = "org.apache.beam.fn.v1";
 option java_outer_classname = "BeamFnApi";
 
 import "beam_runner_api.proto";
+import "google/protobuf/any.proto";
 import "google/protobuf/timestamp.proto";
 
 /*
@@ -66,6 +67,122 @@ message Target {
   string name = 2;
 }
 
+// Information defining a PCollection
+message PCollection {
+  // (Required) A reference to a coder.
+  string coder_reference = 1;
+
+  // TODO: Windowing strategy, ...
+}
+
+// A primitive transform within Apache Beam.
+message PrimitiveTransform {
+  // (Required) A pipeline level unique id which can be used as a reference to
+  // refer to this.
+  string id = 1;
+
+  // (Required) A function spec that is used by this primitive
+  // transform to process data.
+  FunctionSpec function_spec = 2;
+
+  // A map of distinct input names to target definitions.
+  // For example, in CoGbk this represents the tag name associated with each
+  // distinct input name and a list of primitive transforms that are associated
+  // with the specified input.
+  map<string, Target.List> inputs = 3;
+
+  // A map from local output name to PCollection definitions. For example, in
+  // DoFn this represents the tag name associated with each distinct output.
+  map<string, PCollection> outputs = 4;
+
+  // TODO: Should we model side inputs as a special type of input for a
+  // primitive transform or should it be modeled as the relationship that
+  // the predecessor input will be a view primitive transform.
+  // A map of from side input names to side inputs.
+  map<string, SideInput> side_inputs = 5;
+
+  // The user name of this step.
+  // TODO: This should really be in display data and not at this level
+  string step_name = 6;
+}
+
+/*
+ * User Definable Functions
+ *
+ * This is still unstable mainly due to how we model the side input.
+ */
+
+// Defines the common elements of user-definable functions, to allow the SDK to
+// express the information the runner needs to execute work.
+// Stable
+message FunctionSpec {
+  // (Required) A pipeline level unique id which can be used as a reference to
+  // refer to this.
+  string id = 1;
+
+  // (Required) A globally unique name representing this user definable
+  // function.
+  //
+  // User definable functions use the urn encodings registered such that 
another
+  // may implement the user definable function within another language.
+  //
+  // For example:
+  //    urn:org.apache.beam:coder:kv:1.0
+  string urn = 2;
+
+  // (Required) Reference to specification of execution environment required to
+  // invoke this function.
+  string environment_reference = 3;
+
+  // Data used to parameterize this function. Depending on the urn, this may be
+  // optional or required.
+  google.protobuf.Any data = 4;
+}
+
+message SideInput {
+  // TODO: Coder?
+
+  // For RunnerAPI.
+  Target input = 1;
+
+  // For FnAPI.
+  FunctionSpec view_fn = 2;
+}
+
+// Defines how to encode values into byte streams and decode values from byte
+// streams. A coder can be parameterized by additional properties which may or
+// may not be language agnostic.
+//
+// Coders using the urn:org.apache.beam:coder namespace must have their
+// encodings registered such that another may implement the encoding within
+// another language.
+//
+// For example:
+//    urn:org.apache.beam:coder:kv:1.0
+//    urn:org.apache.beam:coder:iterable:1.0
+// Stable
+message Coder {
+  // TODO: This looks weird when compared to the other function specs
+  // which use URN to differentiate themselves. Should "Coder" be embedded
+  // inside the FunctionSpec data block.
+
+  // The data associated with this coder used to reconstruct it.
+  FunctionSpec function_spec = 1;
+
+  // A list of component coder references.
+  //
+  // For a key-value coder, there must be exactly two component coder 
references
+  // where the first reference represents the key coder and the second 
reference
+  // is the value coder.
+  //
+  // For an iterable coder, there must be exactly one component coder reference
+  // representing the value coder.
+  //
+  // TODO: Perhaps this is redundant with the data of the FunctionSpec
+  // for known coders?
+  repeated string component_coder_reference = 2;
+}
+
 // A descriptor for connecting to a remote port using the Beam Fn Data API.
 // Allows for communication between two environments (for example between the
 // runner and the SDK).
@@ -154,20 +271,29 @@ message ProcessBundleDescriptor {
   // refer to this.
   string id = 1;
 
+  // (Deprecated) A list of primitive transforms that should
+  // be used to construct the bundle processing graph.
+  repeated PrimitiveTransform primitive_transform = 2;
+
+  // (Deprecated) The set of all coders referenced in this bundle.
+  repeated Coder coders = 4;
+
   // (Required) A map from pipeline-scoped id to PTransform.
-  map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 2;
+  map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 5;
 
   // (Required) A map from pipeline-scoped id to PCollection.
-  map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 3;
+  map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 6;
 
   // (Required) A map from pipeline-scoped id to WindowingStrategy.
-  map<string, org.apache.beam.runner_api.v1.WindowingStrategy> 
windowing_strategies = 4;
+  map<string, org.apache.beam.runner_api.v1.WindowingStrategy> 
windowing_strategies = 7;
 
   // (Required) A map from pipeline-scoped id to Coder.
-  map<string, org.apache.beam.runner_api.v1.Coder> coders = 5;
+  // TODO: Rename to "coders" once deprecated coders field is removed. Unique
+  // name is choosen to make it an easy search/replace
+  map<string, org.apache.beam.runner_api.v1.Coder> codersyyy = 8;
 
   // (Required) A map from pipeline-scoped id to Environment.
-  map<string, org.apache.beam.runner_api.v1.Environment> environments = 6;
+  map<string, org.apache.beam.runner_api.v1.Environment> environments = 9;
 }
 
 // A request to process a given bundle.
@@ -177,9 +303,9 @@ message ProcessBundleRequest {
   // instantiated and executed by the SDK harness.
   string process_bundle_descriptor_reference = 1;
 
-  // (Optional) A list of cache tokens that can be used by an SDK to reuse
-  // cached data returned by the State API across multiple bundles.
-  repeated bytes cache_tokens = 2;
+  // (Optional) A list of cache tokens that can be used by an SDK to cache
+  // data looked up using the State API across multiple bundles.
+  repeated CacheToken cache_tokens = 2;
 }
 
 // Stable
@@ -248,14 +374,14 @@ message PrimitiveTransformSplit {
   //
   // For example, a remote GRPC source will have a specific urn and data
   // block containing an ElementCountRestriction.
-  org.apache.beam.runner_api.v1.FunctionSpec completed_restriction = 2;
+  FunctionSpec completed_restriction = 2;
 
   // (Required) A function specification describing the restriction
   // representing the remainder of work for the primitive transform.
   //
   // FOr example, a remote GRPC source will have a specific urn and data
   // block contain an ElemntCountSkipRestriction.
-  org.apache.beam.runner_api.v1.FunctionSpec remaining_restriction = 3;
+  FunctionSpec remaining_restriction = 3;
 }
 
 message ProcessBundleSplitResponse {
@@ -402,10 +528,6 @@ message StateResponse {
   // failed.
   string error = 2;
 
-  // (Optional) If this is specified, then the result of this state request
-  // can be cached using the supplied token.
-  bytes cache_token = 3;
-
   // A corresponding response matching the request will be populated.
   oneof response {
     // A response to getting state.
@@ -431,44 +553,49 @@ service BeamFnState {
   ) {}
 }
 
+message CacheToken {
+  // (Required) Represents the function spec and tag associated with this state
+  // key.
+  //
+  // By combining the function_spec_reference with the tag representing:
+  //   * the input, we refer to the iterable portion of a large GBK
+  //   * the side input, we refer to the side input
+  //   * the user state, we refer to user state
+  Target target = 1;
+
+  // (Required) An opaque identifier.
+  bytes token = 2;
+}
+
 message StateKey {
-  message Runner {
-    // (Required) Opaque information supplied by the runner. Used to support
-    // remote references.
-    bytes key = 1;
-  }
+  // (Required) Represents the function spec and tag associated with this state
+  // key.
+  //
+  // By combining the function_spec_reference with the tag representing:
+  //   * the input, we refer to fetching the iterable portion of a large GBK
+  //   * the side input, we refer to fetching the side input
+  //   * the user state, we refer to fetching user state
+  Target target = 1;
 
-  message MultimapSideInput {
-    // (Required) The id of the PTransform containing a side input.
-    string ptransform_id = 1;
-    // (Required) The id of the side input.
-    string side_input_id = 2;
-    // (Required) The window (after mapping the currently executing elements
-    // window into the side input windows domain) encoded in a nested context.
-    bytes window = 3;
-    // (Required) The key encoded in a nested context.
-    bytes key = 4;
-  }
+  // (Required) The bytes of the window which this state request is for encoded
+  // in the nested context.
+  bytes window = 2;
 
-  message BagUserState {
-    // (Required) The id of the PTransform containing user state.
-    string ptransform_id = 1;
-    // (Required) The id of the user state.
-    string user_state_id = 2;
-    // (Required) The window encoded in a nested context. 
-    bytes window = 3;
-    // (Required) The key of the currently executing element encoded in a
-    // nested context.
-    bytes key = 4;
-  }
+  // (Required) The user key encoded in the nested context.
+  bytes key = 3;
+}
 
-  // (Required) One of the following state keys must be set.
-  oneof type {
-    Runner runner = 1;
-    MultimapSideInput multimap_side_input = 2;
-    BagUserState bag_user_state = 3;
-    // TODO: represent a state key for user map state
-  }
+// A logical byte stream which can be continued using the state API.
+message ContinuableStream {
+  // (Optional) If specified, represents a token which can be used with the
+  // state API to get the next chunk of this logical byte stream. The end of
+  // the logical byte stream is signalled by this field being unset.
+  bytes continuation_token = 1;
+
+  // Represents a part of a logical byte stream. Elements within
+  // the logical byte stream are encoded in the nested context and
+  // concatenated together.
+  bytes data = 2;
 }
 
 // A request to get state.
@@ -481,18 +608,10 @@ message StateGetRequest {
   bytes continuation_token = 1;
 }
 
-// A response to get state representing a logical byte stream which can be
-// continued using the state API.
+// A response to get state.
 message StateGetResponse {
-  // (Optional) If specified, represents a token which can be used with the
-  // state API to get the next chunk of this logical byte stream. The end of
-  // the logical byte stream is signalled by this field being unset.
-  bytes continuation_token = 1;
-
-  // Represents a part of a logical byte stream. Elements within
-  // the logical byte stream are encoded in the nested context and
-  // concatenated together.
-  bytes data = 2;
+  // (Required) The response containing a continuable logical byte stream.
+  ContinuableStream stream = 1;
 }
 
 // A request to append state.

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml
index 40eefa7..c621ed5 100644
--- a/sdks/common/pom.xml
+++ b/sdks/common/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/common/runner-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/pom.xml b/sdks/common/runner-api/pom.xml
index 8bc4123..f5536a7 100644
--- a/sdks/common/runner-api/pom.xml
+++ b/sdks/common/runner-api/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-common-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 711da2a..039ecb0 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -367,12 +367,9 @@ message WriteFilesPayload {
   // (Required) The SdkFunctionSpec of the FileBasedSink.
   SdkFunctionSpec sink = 1;
 
-  // (Required) The format function.
-  SdkFunctionSpec format_function = 2;
+  bool windowed_writes = 2;
 
-  bool windowed_writes = 3;
-
-  bool runner_determined_sharding = 4;
+  bool runner_determined_sharding = 3;
 }
 
 // A coder, the binary format for serialization and deserialization of data in
@@ -436,14 +433,6 @@ message WindowingStrategy {
   // (Required) The duration, in milliseconds, beyond the end of a window at
   // which the window becomes droppable.
   int64 allowed_lateness = 8;
-
-  // (Required) Indicate whether empty on-time panes should be omitted.
-  OnTimeBehavior OnTimeBehavior = 9;
-
-  // (Required) Whether or not the window fn assigns inputs to exactly one 
window
-  //
-  // This knowledge is required for some optimizations
-  bool assigns_to_one_window = 10;
 }
 
 // Whether or not a PCollection's WindowFn is non-merging, merging, or
@@ -489,17 +478,6 @@ enum ClosingBehavior {
   EMIT_IF_NONEMPTY = 1;
 }
 
-// Controls whether or not an aggregating transform should output data
-// when an on-time pane is empty.
-enum OnTimeBehavior {
-  // Always fire the on-time pane. Even if there is no new data since
-  // the previous firing, an element will be produced.
-  FIRE_ALWAYS = 0;
-
-  // Only fire the on-time pane if there is new data since the previous firing.
-  FIRE_IF_NONEMPTY = 1;
-}
-
 // When a number of windowed, timestamped inputs are aggregated, the timestamp
 // for the resulting output.
 enum OutputTime {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/build-tools/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/pom.xml b/sdks/java/build-tools/pom.xml
index d7d25f6..5a2c498 100644
--- a/sdks/java/build-tools/pom.xml
+++ b/sdks/java/build-tools/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml 
b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 0c9080d..3430750 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -412,13 +412,4 @@
     <!-- PTransforms do not actually support serialization. -->
   </Match>
 
-  <Match>
-    <Class name="org.apache.beam.sdk.options.ProxyInvocationHandler"/>
-    <Field name="~.*"/>
-    <Bug pattern="SE_BAD_FIELD"/>
-    <!--
-      ProxyInvocationHandler implements Serializable only for the sake of 
throwing an informative
-      exception in writeObject()
-    -->
-  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 3f12dc4..11b68e6 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java
deleted file mode 100644
index a86b198..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.coders;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.values.ShardedKey;
-
-/** A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}. 
*/
-@VisibleForTesting
-public class ShardedKeyCoder<KeyT> extends StructuredCoder<ShardedKey<KeyT>> {
-  public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
-    return new ShardedKeyCoder<>(keyCoder);
-  }
-
-  private final Coder<KeyT> keyCoder;
-  private final VarIntCoder shardNumberCoder;
-
-  protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
-    this.keyCoder = keyCoder;
-    this.shardNumberCoder = VarIntCoder.of();
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return Arrays.asList(keyCoder);
-  }
-
-  @Override
-  public void encode(ShardedKey<KeyT> key, OutputStream outStream)
-      throws IOException {
-    keyCoder.encode(key.getKey(), outStream);
-    shardNumberCoder.encode(key.getShardNumber(), outStream);
-  }
-
-  @Override
-  public ShardedKey<KeyT> decode(InputStream inStream)
-      throws IOException {
-    return ShardedKey.of(keyCoder.decode(inStream), 
shardNumberCoder.decode(inStream));
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    keyCoder.verifyDeterministic();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 89cadbd..4143db2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableMap;
@@ -34,7 +35,6 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -43,7 +43,6 @@ import 
org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PBegin;
@@ -53,19 +52,18 @@ import org.apache.beam.sdk.values.PDone;
 /**
  * {@link PTransform}s for reading and writing Avro files.
  *
- * <p>To read a {@link PCollection} from one or more Avro files, use {@code 
AvroIO.read()}, using
- * {@link AvroIO.Read#from} to specify the filename or filepattern to read 
from. See {@link
- * FileSystems} for information on supported file systems and filepatterns.
+ * <p>To read a {@link PCollection} from one or more Avro files, use {@code 
AvroIO.read()},
+ * using {@link AvroIO.Read#from} to specify the filename or filepattern to 
read from.
+ * See {@link FileSystems} for information on supported file systems and 
filepatterns.
  *
- * <p>To read specific records, such as Avro-generated classes, use {@link 
#read(Class)}. To read
- * {@link GenericRecord GenericRecords}, use {@link 
#readGenericRecords(Schema)} which takes a
- * {@link Schema} object, or {@link #readGenericRecords(String)} which takes 
an Avro schema in a
+ * <p>To read specific records, such as Avro-generated classes, use {@link 
#read(Class)}.
+ * To read {@link GenericRecord GenericRecords}, use {@link 
#readGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #readGenericRecords(String)} which takes 
an Avro schema in a
  * JSON-encoded string form. An exception will be thrown if a record doesn't 
match the specified
  * schema.
  *
  * <p>For example:
- *
- * <pre>{@code
+ * <pre> {@code
  * Pipeline p = ...;
  *
  * // A simple Read of a local file (only runs locally):
@@ -77,33 +75,34 @@ import org.apache.beam.sdk.values.PDone;
  * PCollection<GenericRecord> records =
  *     p.apply(AvroIO.readGenericRecords(schema)
  *                .from("gs://my_bucket/path/to/records-*.avro"));
- * }</pre>
+ * } </pre>
  *
  * <p>To write a {@link PCollection} to one or more Avro files, use {@link 
AvroIO.Write}, using
- * {@code AvroIO.write().to(String)} to specify the output filename prefix. 
The default {@link
- * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link 
ShardNameTemplate} (set
- * via {@link Write#withShardNameTemplate(String)}) and optional filename 
suffix (set via {@link
- * Write#withSuffix(String)}, to generate output filenames in a sharded way. 
You can override this
- * default write filename policy using {@link 
Write#to(FileBasedSink.FilenamePolicy)} to specify a
- * custom file naming policy.
+ * {@code AvroIO.write().to(String)} to specify the output filename prefix. 
The default
+ * {@link DefaultFilenamePolicy} will use this prefix, in conjunction with a
+ * {@link ShardNameTemplate} (set via {@link 
Write#withShardNameTemplate(String)}) and optional
+ * filename suffix (set via {@link Write#withSuffix(String)}, to generate 
output filenames in a
+ * sharded way. You can override this default write filename policy using
+ * {@link Write#withFilenamePolicy(FileBasedSink.FilenamePolicy)} to specify a 
custom file naming
+ * policy.
  *
  * <p>By default, all input is put into the global window before writing. If 
per-window writes are
- * desired - for example, when using a streaming runner - {@link 
AvroIO.Write#withWindowedWrites()}
- * will cause windowing and triggering to be preserved. When producing 
windowed writes with a
- * streaming runner that supports triggers, the number of output shards must 
be set explicitly using
- * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you 
to a runner-chosen
- * value, so you may need not set it yourself. A {@link 
FileBasedSink.FilenamePolicy} must be set,
- * and unique windows and triggers must produce unique filenames.
+ * desired - for example, when using a streaming runner -
+ * {@link AvroIO.Write#withWindowedWrites()} will cause windowing and 
triggering to be
+ * preserved. When producing windowed writes, the number of output shards must 
be set explicitly
+ * using {@link AvroIO.Write#withNumShards(int)}; some runners may set this 
for you to a
+ * runner-chosen value, so you may need not set it yourself. A
+ * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and 
triggers must produce
+ * unique filenames.
  *
- * <p>To write specific records, such as Avro-generated classes, use {@link 
#write(Class)}. To write
- * {@link GenericRecord GenericRecords}, use either {@link 
#writeGenericRecords(Schema)} which takes
- * a {@link Schema} object, or {@link #writeGenericRecords(String)} which 
takes a schema in a
- * JSON-encoded string form. An exception will be thrown if a record doesn't 
match the specified
- * schema.
+ * <p>To write specific records, such as Avro-generated classes, use {@link 
#write(Class)}.
+ * To write {@link GenericRecord GenericRecords}, use either {@link 
#writeGenericRecords(Schema)}
+ * which takes a {@link Schema} object, or {@link 
#writeGenericRecords(String)} which takes a schema
+ * in a JSON-encoded string form. An exception will be thrown if a record 
doesn't match the
+ * specified schema.
  *
  * <p>For example:
- *
- * <pre>{@code
+ * <pre> {@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<AvroAutoGenClass> records = ...;
  * 
records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
@@ -114,11 +113,11 @@ import org.apache.beam.sdk.values.PDone;
  * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
  *     .to("gs://my_bucket/path/to/numbers")
  *     .withSuffix(".avro"));
- * }</pre>
+ * } </pre>
  *
- * <p>By default, {@link AvroIO.Write} produces output files that are 
compressed using the {@link
- * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can 
be changed or
- * overridden using {@link AvroIO.Write#withCodec}.
+ * <p>By default, {@link AvroIO.Write} produces output files that are 
compressed using the
+ * {@link org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This 
default can
+ * be changed or overridden using {@link AvroIO.Write#withCodec}.
  */
 public class AvroIO {
   /**
@@ -259,16 +258,11 @@ public class AvroIO {
     @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
     @Nullable abstract String getShardTemplate();
     @Nullable abstract String getFilenameSuffix();
-
-    @Nullable
-    abstract ValueProvider<ResourceId> getTempDirectory();
-
     abstract int getNumShards();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract boolean getWindowedWrites();
     @Nullable abstract FilenamePolicy getFilenamePolicy();
-
     /**
      * The codec used to encode the blocks in the Avro file. String value 
drawn from those in
      * 
https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -283,9 +277,6 @@ public class AvroIO {
     abstract static class Builder<T> {
       abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> 
filenamePrefix);
       abstract Builder<T> setFilenameSuffix(String filenameSuffix);
-
-      abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> 
tempDirectory);
-
       abstract Builder<T> setNumShards(int numShards);
       abstract Builder<T> setShardTemplate(String shardTemplate);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
@@ -305,9 +296,9 @@ public class AvroIO {
      * <p>The name of the output files will be determined by the {@link 
FilenamePolicy} used.
      *
      * <p>By default, a {@link DefaultFilenamePolicy} will build output 
filenames using the
-     * specified prefix, a shard name template (see {@link 
#withShardNameTemplate(String)}, and a
-     * common suffix (if supplied using {@link #withSuffix(String)}). This 
default can be overridden
-     * using {@link #to(FilenamePolicy)}.
+     * specified prefix, a shard name template (see {@link 
#withShardNameTemplate(String)}, and
+     * a common suffix (if supplied using {@link #withSuffix(String)}). This 
default can be
+     * overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
      */
     public Write<T> to(String outputPrefix) {
       return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
@@ -315,21 +306,14 @@ public class AvroIO {
 
     /**
      * Writes to file(s) with the given output prefix. See {@link FileSystems} 
for information on
-     * supported file systems. This prefix is used by the {@link 
DefaultFilenamePolicy} to generate
-     * filenames.
-     *
-     * <p>By default, a {@link DefaultFilenamePolicy} will build output 
filenames using the
-     * specified prefix, a shard name template (see {@link 
#withShardNameTemplate(String)}, and a
-     * common suffix (if supplied using {@link #withSuffix(String)}). This 
default can be overridden
-     * using {@link #to(FilenamePolicy)}.
+     * supported file systems.
      *
-     * <p>This default policy can be overridden using {@link 
#to(FilenamePolicy)}, in which case
-     * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} 
should not be set.
-     * Custom filename policies do not automatically see this prefix - you 
should explicitly pass
-     * the prefix into your {@link FilenamePolicy} object if you need this.
+     * <p>The name of the output files will be determined by the {@link 
FilenamePolicy} used.
      *
-     * <p>If {@link #withTempDirectory} has not been called, this filename 
prefix will be used to
-     * infer a directory for temporary files.
+     * <p>By default, a {@link DefaultFilenamePolicy} will build output 
filenames using the
+     * specified prefix, a shard name template (see {@link 
#withShardNameTemplate(String)}, and
+     * a common suffix (if supplied using {@link #withSuffix(String)}). This 
default can be
+     * overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
      */
     @Experimental(Kind.FILESYSTEM)
     public Write<T> to(ResourceId outputPrefix) {
@@ -358,22 +342,15 @@ public class AvroIO {
     }
 
     /**
-     * Writes to files named according to the given {@link 
FileBasedSink.FilenamePolicy}. A
-     * directory for temporary files must be specified using {@link 
#withTempDirectory}.
+     * Configures the {@link FileBasedSink.FilenamePolicy} that will be used 
to name written files.
      */
-    public Write<T> to(FilenamePolicy filenamePolicy) {
+    public Write<T> withFilenamePolicy(FilenamePolicy filenamePolicy) {
       return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
-    /** Set the base directory used to generate temporary files. */
-    @Experimental(Kind.FILESYSTEM)
-    public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) 
{
-      return toBuilder().setTempDirectory(tempDirectory).build();
-    }
-
     /**
      * Uses the given {@link ShardNameTemplate} for naming output files. This 
option may only be
-     * used when using one of the default filename-prefix to() overrides.
+     * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been 
configured.
      *
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name 
template, and suffix are
      * used.
@@ -383,8 +360,8 @@ public class AvroIO {
     }
 
     /**
-     * Configures the filename suffix for written files. This option may only 
be used when using one
-     * of the default filename-prefix to() overrides.
+     * Configures the filename suffix for written files. This option may only 
be used when
+     * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
      *
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name 
template, and suffix are
      * used.
@@ -425,8 +402,9 @@ public class AvroIO {
     /**
      * Preserves windowing of input elements and writes them to files based on 
the element's window.
      *
-     * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will 
be generated using
-     * {@link FilenamePolicy#windowedFilename}. See also {@link 
WriteFiles#withWindowedWrites()}.
+     * <p>Requires use of {@link 
#withFilenamePolicy(FileBasedSink.FilenamePolicy)}. Filenames will
+     * be generated using {@link FilenamePolicy#windowedFilename}. See also
+     * {@link WriteFiles#withWindowedWrites()}.
      */
     public Write<T> withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
@@ -457,46 +435,32 @@ public class AvroIO {
       return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
     }
 
-    DynamicDestinations<T, Void> resolveDynamicDestinations() {
-      FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
-      if (usedFilenamePolicy == null) {
-        usedFilenamePolicy =
-            DefaultFilenamePolicy.fromStandardParameters(
-                getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), 
getWindowedWrites());
-      }
-      return DynamicFileDestinations.constant(usedFilenamePolicy);
-    }
-
     @Override
     public PDone expand(PCollection<T> input) {
-      checkArgument(
-          getFilenamePrefix() != null || getTempDirectory() != null,
-          "Need to set either the filename prefix or the tempDirectory of a 
AvroIO.Write "
-              + "transform.");
-      if (getFilenamePolicy() != null) {
-        checkArgument(
-            getShardTemplate() == null && getFilenameSuffix() == null,
-            "shardTemplate and filenameSuffix should only be used with the 
default "
-                + "filename policy");
-      }
-      return expandTyped(input, resolveDynamicDestinations());
-    }
+      checkState(getFilenamePrefix() != null,
+          "Need to set the filename prefix of an AvroIO.Write transform.");
+      checkState(
+          (getFilenamePolicy() == null)
+              || (getShardTemplate() == null && getFilenameSuffix() == null),
+          "Cannot set a filename policy and also a filename template or 
suffix.");
+      checkState(getSchema() != null,
+          "Need to set the schema of an AvroIO.Write transform.");
+      checkState(!getWindowedWrites() || (getFilenamePolicy() != null),
+          "When using windowed writes, a filename policy must be set via 
withFilenamePolicy().");
 
-    public <DestinationT> PDone expandTyped(
-        PCollection<T> input, DynamicDestinations<T, DestinationT> 
dynamicDestinations) {
-      ValueProvider<ResourceId> tempDirectory = getTempDirectory();
-      if (tempDirectory == null) {
-        tempDirectory = getFilenamePrefix();
+      FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+      if (usedFilenamePolicy == null) {
+        usedFilenamePolicy = 
DefaultFilenamePolicy.constructUsingStandardParameters(
+            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), 
getWindowedWrites());
       }
-      WriteFiles<T, DestinationT, T> write =
-          WriteFiles.to(
-              new AvroSink<>(
-                  tempDirectory,
-                  dynamicDestinations,
-                  AvroCoder.of(getRecordClass(), getSchema()),
-                  getCodec(),
-                  getMetadata()),
-              SerializableFunctions.<T>identity());
+
+      WriteFiles<T> write = WriteFiles.to(
+            new AvroSink<>(
+                getFilenamePrefix(),
+                usedFilenamePolicy,
+                AvroCoder.of(getRecordClass(), getSchema()),
+                getCodec(),
+                getMetadata()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -509,25 +473,31 @@ public class AvroIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      resolveDynamicDestinations().populateDisplayData(builder);
-
-      String tempDirectory = null;
-      if (getTempDirectory() != null) {
-        tempDirectory =
-            getTempDirectory().isAccessible()
-                ? getTempDirectory().get().toString()
-                : getTempDirectory().toString();
+      checkState(
+          getFilenamePrefix() != null,
+          "Unable to populate DisplayData for invalid AvroIO.Write (unset 
output prefix).");
+      String outputPrefixString = null;
+      if (getFilenamePrefix().isAccessible()) {
+        ResourceId dir = getFilenamePrefix().get();
+        outputPrefixString = dir.toString();
+      } else {
+        outputPrefixString = getFilenamePrefix().toString();
       }
       builder
-          .add(DisplayData.item("schema", getRecordClass()).withLabel("Record 
Schema"))
-          .addIfNotDefault(
-              DisplayData.item("numShards", getNumShards()).withLabel("Maximum 
Output Shards"), 0)
-          .addIfNotDefault(
-              DisplayData.item("codec", getCodec().toString()).withLabel("Avro 
Compression Codec"),
-              DEFAULT_CODEC.toString())
-          .addIfNotNull(
-              DisplayData.item("tempDirectory", tempDirectory)
-                  .withLabel("Directory for temporary files"));
+          .add(DisplayData.item("schema", getRecordClass())
+            .withLabel("Record Schema"))
+          .addIfNotNull(DisplayData.item("filePrefix", outputPrefixString)
+            .withLabel("Output File Prefix"))
+          .addIfNotNull(DisplayData.item("shardNameTemplate", 
getShardTemplate())
+              .withLabel("Output Shard Name Template"))
+          .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
+              .withLabel("Output File Suffix"))
+          .addIfNotDefault(DisplayData.item("numShards", getNumShards())
+              .withLabel("Maximum Output Shards"),
+              0)
+          .addIfNotDefault(DisplayData.item("codec", getCodec().toString())
+              .withLabel("Avro Compression Codec"),
+              DEFAULT_CODEC.toString());
       builder.include("Metadata", new Metadata());
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index c78870b..6c36266 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -32,40 +32,39 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /** A {@link FileBasedSink} for Avro files. */
-class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> {
+class AvroSink<T> extends FileBasedSink<T> {
   private final AvroCoder<T> coder;
   private final SerializableAvroCodecFactory codec;
   private final ImmutableMap<String, Object> metadata;
 
   AvroSink(
       ValueProvider<ResourceId> outputPrefix,
-      DynamicDestinations<T, DestinationT> dynamicDestinations,
+      FilenamePolicy filenamePolicy,
       AvroCoder<T> coder,
       SerializableAvroCodecFactory codec,
       ImmutableMap<String, Object> metadata) {
     // Avro handle compression internally using the codec.
-    super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED);
+    super(outputPrefix, filenamePolicy, CompressionType.UNCOMPRESSED);
     this.coder = coder;
     this.codec = codec;
     this.metadata = metadata;
   }
 
   @Override
-  public WriteOperation<T, DestinationT> createWriteOperation() {
+  public WriteOperation<T> createWriteOperation() {
     return new AvroWriteOperation<>(this, coder, codec, metadata);
   }
 
   /** A {@link WriteOperation WriteOperation} for Avro files. */
-  private static class AvroWriteOperation<T, DestinationT> extends 
WriteOperation<T, DestinationT> {
+  private static class AvroWriteOperation<T> extends WriteOperation<T> {
     private final AvroCoder<T> coder;
     private final SerializableAvroCodecFactory codec;
     private final ImmutableMap<String, Object> metadata;
 
-    private AvroWriteOperation(
-        AvroSink<T, DestinationT> sink,
-        AvroCoder<T> coder,
-        SerializableAvroCodecFactory codec,
-        ImmutableMap<String, Object> metadata) {
+    private AvroWriteOperation(AvroSink<T> sink,
+                               AvroCoder<T> coder,
+                               SerializableAvroCodecFactory codec,
+                               ImmutableMap<String, Object> metadata) {
       super(sink);
       this.coder = coder;
       this.codec = codec;
@@ -73,23 +72,22 @@ class AvroSink<T, DestinationT> extends FileBasedSink<T, 
DestinationT> {
     }
 
     @Override
-    public Writer<T, DestinationT> createWriter() throws Exception {
+    public Writer<T> createWriter() throws Exception {
       return new AvroWriter<>(this, coder, codec, metadata);
     }
   }
 
   /** A {@link Writer Writer} for Avro files. */
-  private static class AvroWriter<T, DestinationT> extends Writer<T, 
DestinationT> {
+  private static class AvroWriter<T> extends Writer<T> {
     private final AvroCoder<T> coder;
     private DataFileWriter<T> dataFileWriter;
     private SerializableAvroCodecFactory codec;
     private final ImmutableMap<String, Object> metadata;
 
-    public AvroWriter(
-        WriteOperation<T, DestinationT> writeOperation,
-        AvroCoder<T> coder,
-        SerializableAvroCodecFactory codec,
-        ImmutableMap<String, Object> metadata) {
+    public AvroWriter(WriteOperation<T> writeOperation,
+                      AvroCoder<T> coder,
+                      SerializableAvroCodecFactory codec,
+                      ImmutableMap<String, Object> metadata) {
       super(writeOperation, MimeTypes.BINARY);
       this.coder = coder;
       this.codec = codec;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 4baac36..6ab8dec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -96,6 +96,12 @@ public class CompressedSource<T> extends FileBasedSource<T> {
      */
     ReadableByteChannel createDecompressingChannel(String fileName, 
ReadableByteChannel channel)
         throws IOException;
+
+    /**
+     * Given a file name, returns true if the file name matches any supported 
compression
+     * scheme.
+     */
+    boolean isCompressed(String fileName);
   }
 
   /**
@@ -236,16 +242,6 @@ public class CompressedSource<T> extends 
FileBasedSource<T> {
     @Override
     public abstract ReadableByteChannel 
createDecompressingChannel(ReadableByteChannel channel)
         throws IOException;
-
-    /** Returns whether the file's extension matches of one of the known 
compression formats. */
-    public static boolean isCompressed(String filename) {
-      for (CompressionMode type : CompressionMode.values()) {
-        if  (type.matches(filename)) {
-          return true;
-        }
-      }
-      return false;
-    }
   }
 
   /**
@@ -277,6 +273,16 @@ public class CompressedSource<T> extends 
FileBasedSource<T> {
               ReadableByteChannel.class.getSimpleName(),
               ReadableByteChannel.class.getSimpleName()));
     }
+
+    @Override
+    public boolean isCompressed(String fileName) {
+      for (CompressionMode type : CompressionMode.values()) {
+        if  (type.matches(fileName)) {
+          return true;
+        }
+      }
+      return false;
+    }
   }
 
   private final FileBasedSource<T> sourceDelegate;
@@ -360,9 +366,13 @@ public class CompressedSource<T> extends 
FileBasedSource<T> {
    */
   @Override
   protected final boolean isSplittable() throws Exception {
-    return channelFactory instanceof FileNameBasedDecompressingChannelFactory
-        && !CompressionMode.isCompressed(getFileOrPatternSpec())
-        && sourceDelegate.isSplittable();
+    if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
+      FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
+          (FileNameBasedDecompressingChannelFactory) channelFactory;
+      return !fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec())
+          && sourceDelegate.isSplittable();
+    }
+    return false;
   }
 
   /**
@@ -376,7 +386,9 @@ public class CompressedSource<T> extends FileBasedSource<T> 
{
   @Override
   protected final FileBasedReader<T> createSingleFileReader(PipelineOptions 
options) {
     if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
-      if (!CompressionMode.isCompressed(getFileOrPatternSpec())) {
+      FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
+          (FileNameBasedDecompressingChannelFactory) channelFactory;
+      if (!fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec())) {
         return sourceDelegate.createSingleFileReader(options);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 7a60e49..f9e4ac4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -20,31 +20,25 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.MoreObjects.firstNonNull;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
 import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A default {@link FilenamePolicy} for windowed and unwindowed files. This 
policy is constructed
@@ -57,7 +51,10 @@ import 
org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
  * {@code WriteOneFilePerWindow} example pipeline.
  */
 public final class DefaultFilenamePolicy extends FilenamePolicy {
-  /** The default sharding name template. */
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DefaultFilenamePolicy.class);
+
+  /** The default sharding name template used in {@link 
#constructUsingStandardParameters}. */
   public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = 
ShardNameTemplate.INDEX_OF_MAX;
 
   /** The default windowed sharding name template used when writing windowed 
files.
@@ -70,184 +67,75 @@ public final class DefaultFilenamePolicy extends 
FilenamePolicy {
       "W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
 
   /*
-   * pattern for both windowed and non-windowed file names.
+   * pattern for both windowed and non-windowed file names
    */
   private static final Pattern SHARD_FORMAT_RE = 
Pattern.compile("(S+|N+|W|P)");
 
   /**
-   * Encapsulates constructor parameters to {@link DefaultFilenamePolicy}.
-   *
-   * <p>This is used as the {@code DestinationT} argument to allow {@link 
DefaultFilenamePolicy}
-   * objects to be dynamically generated.
-   */
-  public static class Params implements Serializable {
-    private final ValueProvider<ResourceId> baseFilename;
-    private final String shardTemplate;
-    private final boolean explicitTemplate;
-    private final String suffix;
-
-    /**
-     * Construct a default Params object. The shard template will be set to 
the default {@link
-     * #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} value.
-     */
-    public Params() {
-      this.baseFilename = null;
-      this.shardTemplate = DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
-      this.suffix = "";
-      this.explicitTemplate = false;
-    }
-
-    private Params(
-        ValueProvider<ResourceId> baseFilename,
-        String shardTemplate,
-        String suffix,
-        boolean explicitTemplate) {
-      this.baseFilename = baseFilename;
-      this.shardTemplate = shardTemplate;
-      this.suffix = suffix;
-      this.explicitTemplate = explicitTemplate;
-    }
-
-    /**
-     * Specify that writes are windowed. This affects the default shard 
template, changing it to
-     * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE}.
-     */
-    public Params withWindowedWrites() {
-      String template = this.shardTemplate;
-      if (!explicitTemplate) {
-        template = DEFAULT_WINDOWED_SHARD_TEMPLATE;
-      }
-      return new Params(baseFilename, template, suffix, explicitTemplate);
-    }
-
-    /** Sets the base filename. */
-    public Params withBaseFilename(ResourceId baseFilename) {
-      return withBaseFilename(StaticValueProvider.of(baseFilename));
-    }
-
-    /** Like {@link #withBaseFilename(ResourceId)}, but takes in a {@link 
ValueProvider}. */
-    public Params withBaseFilename(ValueProvider<ResourceId> baseFilename) {
-      return new Params(baseFilename, shardTemplate, suffix, explicitTemplate);
-    }
-
-    /** Sets the shard template. */
-    public Params withShardTemplate(String shardTemplate) {
-      return new Params(baseFilename, shardTemplate, suffix, true);
-    }
-
-    /** Sets the suffix. */
-    public Params withSuffix(String suffix) {
-      return new Params(baseFilename, shardTemplate, suffix, explicitTemplate);
-    }
-  }
-
-  /** A Coder for {@link Params}. */
-  public static class ParamsCoder extends AtomicCoder<Params> {
-    private static final ParamsCoder INSTANCE = new ParamsCoder();
-    private Coder<String> stringCoder = StringUtf8Coder.of();
-
-    public static ParamsCoder of() {
-      return INSTANCE;
-    }
-
-    @Override
-    public void encode(Params value, OutputStream outStream) throws 
IOException {
-      if (value == null) {
-        throw new CoderException("cannot encode a null value");
-      }
-      stringCoder.encode(value.baseFilename.get().toString(), outStream);
-      stringCoder.encode(value.shardTemplate, outStream);
-      stringCoder.encode(value.suffix, outStream);
-    }
-
-    @Override
-    public Params decode(InputStream inStream) throws IOException {
-      ResourceId prefix =
-          
FileBasedSink.convertToFileResourceIfPossible(stringCoder.decode(inStream));
-      String shardTemplate = stringCoder.decode(inStream);
-      String suffix = stringCoder.decode(inStream);
-      return new Params()
-          .withBaseFilename(prefix)
-          .withShardTemplate(shardTemplate)
-          .withSuffix(suffix);
-    }
-  }
-
-  private final Params params;
-  /**
    * Constructs a new {@link DefaultFilenamePolicy}.
    *
    * @see DefaultFilenamePolicy for more information on the arguments to this 
function.
    */
   @VisibleForTesting
-  DefaultFilenamePolicy(Params params) {
-    this.params = params;
+  DefaultFilenamePolicy(ValueProvider<String> prefix, String shardTemplate, 
String suffix) {
+    this.prefix = prefix;
+    this.shardTemplate = shardTemplate;
+    this.suffix = suffix;
   }
 
   /**
-   * Construct a {@link DefaultFilenamePolicy}.
+   * A helper function to construct a {@link DefaultFilenamePolicy} using the 
standard filename
+   * parameters, namely a provided {@link ResourceId} for the output prefix, 
and possibly-null
+   * shard name template and suffix.
    *
-   * <p>This is a shortcut for:
+   * <p>Any filename component of the provided resource will be used as the 
filename prefix.
    *
-   * <pre>{@code
-   *   DefaultFilenamePolicy.fromParams(new Params()
-   *     .withBaseFilename(baseFilename)
-   *     .withShardTemplate(shardTemplate)
-   *     .withSuffix(filenameSuffix)
-   *     .withWindowedWrites())
-   * }</pre>
+   * <p>If provided, the shard name template will be used; otherwise
+   * {@link #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} will be used for non-windowed 
file names and
+   * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will be used for windowed file 
names.
    *
-   * <p>Where the respective {@code with} methods are invoked only if the 
value is non-null or true.
+   * <p>If provided, the suffix will be used; otherwise the files will have an 
empty suffix.
    */
-  public static DefaultFilenamePolicy fromStandardParameters(
-      ValueProvider<ResourceId> baseFilename,
+  public static DefaultFilenamePolicy constructUsingStandardParameters(
+      ValueProvider<ResourceId> outputPrefix,
       @Nullable String shardTemplate,
       @Nullable String filenameSuffix,
       boolean windowedWrites) {
-    Params params = new Params().withBaseFilename(baseFilename);
-    if (shardTemplate != null) {
-      params = params.withShardTemplate(shardTemplate);
-    }
-    if (filenameSuffix != null) {
-      params = params.withSuffix(filenameSuffix);
-    }
-    if (windowedWrites) {
-      params = params.withWindowedWrites();
-    }
-    return fromParams(params);
+    // Pick the appropriate default policy based on whether windowed writes 
are being performed.
+    String defaultTemplate =
+        windowedWrites ? DEFAULT_WINDOWED_SHARD_TEMPLATE : 
DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
+    return new DefaultFilenamePolicy(
+        NestedValueProvider.of(outputPrefix, new ExtractFilename()),
+        firstNonNull(shardTemplate, defaultTemplate),
+        firstNonNull(filenameSuffix, ""));
   }
 
-  /** Construct a {@link DefaultFilenamePolicy} from a {@link Params} object. 
*/
-  public static DefaultFilenamePolicy fromParams(Params params) {
-    return new DefaultFilenamePolicy(params);
-  }
+  private final ValueProvider<String> prefix;
+  private final String shardTemplate;
+  private final String suffix;
 
   /**
    * Constructs a fully qualified name from components.
    *
-   * <p>The name is built from a base filename, shard template (with shard 
numbers applied), and a
-   * suffix. All components are required, but may be empty strings.
+   * <p>The name is built from a prefix, shard template (with shard numbers
+   * applied), and a suffix.  All components are required, but may be empty
+   * strings.
    *
-   * <p>Within a shard template, repeating sequences of the letters "S" or "N" 
are replaced with the
-   * shard number, or number of shards respectively. "P" is replaced with by 
stringification of
-   * current pane. "W" is replaced by stringification of current window.
+   * <p>Within a shard template, repeating sequences of the letters "S" or "N"
+   * are replaced with the shard number, or number of shards respectively.
+   * "P" is replaced with by stringification of current pane.
+   * "W" is replaced by stringification of current window.
    *
-   * <p>The numbers are formatted with leading zeros to match the length of 
the repeated sequence of
-   * letters.
+   * <p>The numbers are formatted with leading zeros to match the length of the
+   * repeated sequence of letters.
    *
-   * <p>For example, if baseFilename = "path/to/output", shardTemplate = 
"-SSS-of-NNN", and suffix =
-   * ".txt", with shardNum = 1 and numShards = 100, the following is produced:
-   * "path/to/output-001-of-100.txt".
+   * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
+   * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
+   * produced:  "output-001-of-100.txt".
    */
-  static ResourceId constructName(
-      ResourceId baseFilename,
-      String shardTemplate,
-      String suffix,
-      int shardNum,
-      int numShards,
-      String paneStr,
-      String windowStr) {
-    String prefix = extractFilename(baseFilename);
+  static String constructName(
+      String prefix, String shardTemplate, String suffix, int shardNum, int 
numShards,
+      String paneStr, String windowStr) {
     // Matcher API works with StringBuffer, rather than StringBuilder.
     StringBuffer sb = new StringBuffer();
     sb.append(prefix);
@@ -277,37 +165,27 @@ public final class DefaultFilenamePolicy extends 
FilenamePolicy {
     m.appendTail(sb);
 
     sb.append(suffix);
-    return baseFilename
-        .getCurrentDirectory()
-        .resolve(sb.toString(), StandardResolveOptions.RESOLVE_FILE);
+    return sb.toString();
   }
 
   @Override
   @Nullable
-  public ResourceId unwindowedFilename(Context context, OutputFileHints 
outputFileHints) {
-    return constructName(
-        params.baseFilename.get(),
-        params.shardTemplate,
-        params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
-        context.getShardNumber(),
-        context.getNumShards(),
-        null,
-        null);
+  public ResourceId unwindowedFilename(ResourceId outputDirectory, Context 
context,
+      String extension) {
+    String filename = constructName(prefix.get(), shardTemplate, suffix, 
context.getShardNumber(),
+        context.getNumShards(), null, null) + extension;
+    return outputDirectory.resolve(filename, 
StandardResolveOptions.RESOLVE_FILE);
   }
 
   @Override
-  public ResourceId windowedFilename(WindowedContext context, OutputFileHints 
outputFileHints) {
+  public ResourceId windowedFilename(ResourceId outputDirectory,
+      WindowedContext context, String extension) {
     final PaneInfo paneInfo = context.getPaneInfo();
     String paneStr = paneInfoToString(paneInfo);
     String windowStr = windowToString(context.getWindow());
-    return constructName(
-        params.baseFilename.get(),
-        params.shardTemplate,
-        params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
-        context.getShardNumber(),
-        context.getNumShards(),
-        paneStr,
-        windowStr);
+    String filename = constructName(prefix.get(), shardTemplate, suffix, 
context.getShardNumber(),
+        context.getNumShards(), paneStr, windowStr) + extension;
+    return outputDirectory.resolve(filename, 
StandardResolveOptions.RESOLVE_FILE);
   }
 
   /*
@@ -338,32 +216,24 @@ public final class DefaultFilenamePolicy extends 
FilenamePolicy {
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     String filenamePattern;
-    if (params.baseFilename.isAccessible()) {
-      filenamePattern =
-          String.format("%s%s%s", params.baseFilename.get(), 
params.shardTemplate, params.suffix);
+    if (prefix.isAccessible()) {
+      filenamePattern = String.format("%s%s%s", prefix.get(), shardTemplate, 
suffix);
     } else {
-      filenamePattern =
-          String.format("%s%s%s", params.baseFilename, params.shardTemplate, 
params.suffix);
+      filenamePattern = String.format("%s%s%s", prefix, shardTemplate, suffix);
     }
-
-    String outputPrefixString = null;
-    outputPrefixString =
-        params.baseFilename.isAccessible()
-            ? params.baseFilename.get().toString()
-            : params.baseFilename.toString();
-    builder.add(DisplayData.item("filenamePattern", 
filenamePattern).withLabel("Filename Pattern"));
-    builder.add(DisplayData.item("filePrefix", 
outputPrefixString).withLabel("Output File Prefix"));
-    builder.add(DisplayData.item("fileSuffix", 
params.suffix).withLabel("Output file Suffix"));
     builder.add(
-        DisplayData.item("shardNameTemplate", params.shardTemplate)
-            .withLabel("Output Shard Name Template"));
+        DisplayData.item("filenamePattern", filenamePattern)
+            .withLabel("Filename Pattern"));
   }
 
-  private static String extractFilename(ResourceId input) {
-    if (input.isDirectory()) {
-      return "";
-    } else {
-      return firstNonNull(input.getFilename(), "");
+  private static class ExtractFilename implements 
SerializableFunction<ResourceId, String> {
+    @Override
+    public String apply(ResourceId input) {
+      if (input.isDirectory()) {
+        return "";
+      } else {
+        return firstNonNull(input.getFilename(), "");
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
deleted file mode 100644
index e7ef0f6..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
-import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder;
-import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-
-/** Some helper classes that derive from {@link 
FileBasedSink.DynamicDestinations}. */
-public class DynamicFileDestinations {
-  /** Always returns a constant {@link FilenamePolicy}. */
-  private static class ConstantFilenamePolicy<T> extends 
DynamicDestinations<T, Void> {
-    private final FilenamePolicy filenamePolicy;
-
-    public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
-      this.filenamePolicy = filenamePolicy;
-    }
-
-    @Override
-    public Void getDestination(T element) {
-      return (Void) null;
-    }
-
-    @Override
-    public Coder<Void> getDestinationCoder() {
-      return null;
-    }
-
-    @Override
-    public Void getDefaultDestination() {
-      return (Void) null;
-    }
-
-    @Override
-    public FilenamePolicy getFilenamePolicy(Void destination) {
-      return filenamePolicy;
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      filenamePolicy.populateDisplayData(builder);
-    }
-  }
-
-  /**
-   * A base class for a {@link DynamicDestinations} object that returns 
differently-configured
-   * instances of {@link DefaultFilenamePolicy}.
-   */
-  private static class DefaultPolicyDestinations<UserT> extends 
DynamicDestinations<UserT, Params> {
-    SerializableFunction<UserT, Params> destinationFunction;
-    Params emptyDestination;
-
-    public DefaultPolicyDestinations(
-        SerializableFunction<UserT, Params> destinationFunction, Params 
emptyDestination) {
-      this.destinationFunction = destinationFunction;
-      this.emptyDestination = emptyDestination;
-    }
-
-    @Override
-    public Params getDestination(UserT element) {
-      return destinationFunction.apply(element);
-    }
-
-    @Override
-    public Params getDefaultDestination() {
-      return emptyDestination;
-    }
-
-    @Nullable
-    @Override
-    public Coder<Params> getDestinationCoder() {
-      return ParamsCoder.of();
-    }
-
-    @Override
-    public FilenamePolicy getFilenamePolicy(DefaultFilenamePolicy.Params 
params) {
-      return DefaultFilenamePolicy.fromParams(params);
-    }
-  }
-
-  /** Returns a {@link DynamicDestinations} that always returns the same 
{@link FilenamePolicy}. */
-  public static <T> DynamicDestinations<T, Void> constant(FilenamePolicy 
filenamePolicy) {
-    return new ConstantFilenamePolicy<>(filenamePolicy);
-  }
-
-  /**
-   * Returns a {@link DynamicDestinations} that returns instances of {@link 
DefaultFilenamePolicy}
-   * configured with the given {@link Params}.
-   */
-  public static <UserT> DynamicDestinations<UserT, Params> toDefaultPolicies(
-      SerializableFunction<UserT, Params> destinationFunction, Params 
emptyDestination) {
-    return new DefaultPolicyDestinations<>(destinationFunction, 
emptyDestination);
-  }
-}

Reply via email to