http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java deleted file mode 100644 index b4597f9..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.stateful; - -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateInternalsTest; -import org.junit.Ignore; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link SparkStateInternals}. This is based on {@link StateInternalsTest}. - * Ignore set and map tests. - */ -@RunWith(JUnit4.class) -public class SparkStateInternalsTest extends StateInternalsTest { - - @Override - protected StateInternals createStateInternals() { - return SparkStateInternals.forKey("dummyKey"); - } - - @Override - @Ignore - public void testSet() {} - - @Override - @Ignore - public void testSetIsEmpty() {} - - @Override - @Ignore - public void testMergeSetIntoSource() {} - - @Override - @Ignore - public void testMergeSetIntoNewNamespace() {} - - @Override - @Ignore - public void testMap() {} - - @Override - @Ignore - public void testSetReadable() {} - - @Override - @Ignore - public void testMapReadable() {} - -}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java index 8bd6dae..8f2e681 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java @@ -58,12 +58,12 @@ public class StorageLevelTest { @Test public void test() throws Exception { - PCollection<String> pCollection = pipeline.apply("CreateFoo", Create.of("foo")); + PCollection<String> pCollection = pipeline.apply(Create.of("foo")); // by default, the Spark runner doesn't cache the RDD if it accessed only one time. // So, to "force" the caching of the RDD, we have to call the RDD at least two time. // That's why we are using Count fn on the PCollection. - pCollection.apply("CountAll", Count.<String>globally()); + pCollection.apply(Count.<String>globally()); PCollection<String> output = pCollection.apply(new StorageLevelPTransform()); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/common/fn-api/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml index 6810667..77a9ba5 100644 --- a/sdks/common/fn-api/pom.xml +++ b/sdks/common/fn-api/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-common-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto index 9da5afe..9fe2b2f 100644 --- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto +++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto @@ -38,6 +38,7 @@ option java_package = "org.apache.beam.fn.v1"; option java_outer_classname = "BeamFnApi"; import "beam_runner_api.proto"; +import "google/protobuf/any.proto"; import "google/protobuf/timestamp.proto"; /* @@ -66,6 +67,122 @@ message Target { string name = 2; } +// Information defining a PCollection +message PCollection { + // (Required) A reference to a coder. + string coder_reference = 1; + + // TODO: Windowing strategy, ... +} + +// A primitive transform within Apache Beam. +message PrimitiveTransform { + // (Required) A pipeline level unique id which can be used as a reference to + // refer to this. + string id = 1; + + // (Required) A function spec that is used by this primitive + // transform to process data. + FunctionSpec function_spec = 2; + + // A map of distinct input names to target definitions. + // For example, in CoGbk this represents the tag name associated with each + // distinct input name and a list of primitive transforms that are associated + // with the specified input. + map<string, Target.List> inputs = 3; + + // A map from local output name to PCollection definitions. For example, in + // DoFn this represents the tag name associated with each distinct output. + map<string, PCollection> outputs = 4; + + // TODO: Should we model side inputs as a special type of input for a + // primitive transform or should it be modeled as the relationship that + // the predecessor input will be a view primitive transform. + // A map of from side input names to side inputs. + map<string, SideInput> side_inputs = 5; + + // The user name of this step. + // TODO: This should really be in display data and not at this level + string step_name = 6; +} + +/* + * User Definable Functions + * + * This is still unstable mainly due to how we model the side input. + */ + +// Defines the common elements of user-definable functions, to allow the SDK to +// express the information the runner needs to execute work. +// Stable +message FunctionSpec { + // (Required) A pipeline level unique id which can be used as a reference to + // refer to this. + string id = 1; + + // (Required) A globally unique name representing this user definable + // function. + // + // User definable functions use the urn encodings registered such that another + // may implement the user definable function within another language. + // + // For example: + // urn:org.apache.beam:coder:kv:1.0 + string urn = 2; + + // (Required) Reference to specification of execution environment required to + // invoke this function. + string environment_reference = 3; + + // Data used to parameterize this function. Depending on the urn, this may be + // optional or required. + google.protobuf.Any data = 4; +} + +message SideInput { + // TODO: Coder? + + // For RunnerAPI. + Target input = 1; + + // For FnAPI. + FunctionSpec view_fn = 2; +} + +// Defines how to encode values into byte streams and decode values from byte +// streams. A coder can be parameterized by additional properties which may or +// may not be language agnostic. +// +// Coders using the urn:org.apache.beam:coder namespace must have their +// encodings registered such that another may implement the encoding within +// another language. +// +// For example: +// urn:org.apache.beam:coder:kv:1.0 +// urn:org.apache.beam:coder:iterable:1.0 +// Stable +message Coder { + // TODO: This looks weird when compared to the other function specs + // which use URN to differentiate themselves. Should "Coder" be embedded + // inside the FunctionSpec data block. + + // The data associated with this coder used to reconstruct it. + FunctionSpec function_spec = 1; + + // A list of component coder references. + // + // For a key-value coder, there must be exactly two component coder references + // where the first reference represents the key coder and the second reference + // is the value coder. + // + // For an iterable coder, there must be exactly one component coder reference + // representing the value coder. + // + // TODO: Perhaps this is redundant with the data of the FunctionSpec + // for known coders? + repeated string component_coder_reference = 2; +} + // A descriptor for connecting to a remote port using the Beam Fn Data API. // Allows for communication between two environments (for example between the // runner and the SDK). @@ -154,20 +271,29 @@ message ProcessBundleDescriptor { // refer to this. string id = 1; + // (Deprecated) A list of primitive transforms that should + // be used to construct the bundle processing graph. + repeated PrimitiveTransform primitive_transform = 2; + + // (Deprecated) The set of all coders referenced in this bundle. + repeated Coder coders = 4; + // (Required) A map from pipeline-scoped id to PTransform. - map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 2; + map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 5; // (Required) A map from pipeline-scoped id to PCollection. - map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 3; + map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 6; // (Required) A map from pipeline-scoped id to WindowingStrategy. - map<string, org.apache.beam.runner_api.v1.WindowingStrategy> windowing_strategies = 4; + map<string, org.apache.beam.runner_api.v1.WindowingStrategy> windowing_strategies = 7; // (Required) A map from pipeline-scoped id to Coder. - map<string, org.apache.beam.runner_api.v1.Coder> coders = 5; + // TODO: Rename to "coders" once deprecated coders field is removed. Unique + // name is choosen to make it an easy search/replace + map<string, org.apache.beam.runner_api.v1.Coder> codersyyy = 8; // (Required) A map from pipeline-scoped id to Environment. - map<string, org.apache.beam.runner_api.v1.Environment> environments = 6; + map<string, org.apache.beam.runner_api.v1.Environment> environments = 9; } // A request to process a given bundle. @@ -177,9 +303,9 @@ message ProcessBundleRequest { // instantiated and executed by the SDK harness. string process_bundle_descriptor_reference = 1; - // (Optional) A list of cache tokens that can be used by an SDK to reuse - // cached data returned by the State API across multiple bundles. - repeated bytes cache_tokens = 2; + // (Optional) A list of cache tokens that can be used by an SDK to cache + // data looked up using the State API across multiple bundles. + repeated CacheToken cache_tokens = 2; } // Stable @@ -248,14 +374,14 @@ message PrimitiveTransformSplit { // // For example, a remote GRPC source will have a specific urn and data // block containing an ElementCountRestriction. - org.apache.beam.runner_api.v1.FunctionSpec completed_restriction = 2; + FunctionSpec completed_restriction = 2; // (Required) A function specification describing the restriction // representing the remainder of work for the primitive transform. // // FOr example, a remote GRPC source will have a specific urn and data // block contain an ElemntCountSkipRestriction. - org.apache.beam.runner_api.v1.FunctionSpec remaining_restriction = 3; + FunctionSpec remaining_restriction = 3; } message ProcessBundleSplitResponse { @@ -402,10 +528,6 @@ message StateResponse { // failed. string error = 2; - // (Optional) If this is specified, then the result of this state request - // can be cached using the supplied token. - bytes cache_token = 3; - // A corresponding response matching the request will be populated. oneof response { // A response to getting state. @@ -431,44 +553,49 @@ service BeamFnState { ) {} } +message CacheToken { + // (Required) Represents the function spec and tag associated with this state + // key. + // + // By combining the function_spec_reference with the tag representing: + // * the input, we refer to the iterable portion of a large GBK + // * the side input, we refer to the side input + // * the user state, we refer to user state + Target target = 1; + + // (Required) An opaque identifier. + bytes token = 2; +} + message StateKey { - message Runner { - // (Required) Opaque information supplied by the runner. Used to support - // remote references. - bytes key = 1; - } + // (Required) Represents the function spec and tag associated with this state + // key. + // + // By combining the function_spec_reference with the tag representing: + // * the input, we refer to fetching the iterable portion of a large GBK + // * the side input, we refer to fetching the side input + // * the user state, we refer to fetching user state + Target target = 1; - message MultimapSideInput { - // (Required) The id of the PTransform containing a side input. - string ptransform_id = 1; - // (Required) The id of the side input. - string side_input_id = 2; - // (Required) The window (after mapping the currently executing elements - // window into the side input windows domain) encoded in a nested context. - bytes window = 3; - // (Required) The key encoded in a nested context. - bytes key = 4; - } + // (Required) The bytes of the window which this state request is for encoded + // in the nested context. + bytes window = 2; - message BagUserState { - // (Required) The id of the PTransform containing user state. - string ptransform_id = 1; - // (Required) The id of the user state. - string user_state_id = 2; - // (Required) The window encoded in a nested context. - bytes window = 3; - // (Required) The key of the currently executing element encoded in a - // nested context. - bytes key = 4; - } + // (Required) The user key encoded in the nested context. + bytes key = 3; +} - // (Required) One of the following state keys must be set. - oneof type { - Runner runner = 1; - MultimapSideInput multimap_side_input = 2; - BagUserState bag_user_state = 3; - // TODO: represent a state key for user map state - } +// A logical byte stream which can be continued using the state API. +message ContinuableStream { + // (Optional) If specified, represents a token which can be used with the + // state API to get the next chunk of this logical byte stream. The end of + // the logical byte stream is signalled by this field being unset. + bytes continuation_token = 1; + + // Represents a part of a logical byte stream. Elements within + // the logical byte stream are encoded in the nested context and + // concatenated together. + bytes data = 2; } // A request to get state. @@ -481,18 +608,10 @@ message StateGetRequest { bytes continuation_token = 1; } -// A response to get state representing a logical byte stream which can be -// continued using the state API. +// A response to get state. message StateGetResponse { - // (Optional) If specified, represents a token which can be used with the - // state API to get the next chunk of this logical byte stream. The end of - // the logical byte stream is signalled by this field being unset. - bytes continuation_token = 1; - - // Represents a part of a logical byte stream. Elements within - // the logical byte stream are encoded in the nested context and - // concatenated together. - bytes data = 2; + // (Required) The response containing a continuable logical byte stream. + ContinuableStream stream = 1; } // A request to append state. http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/common/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml index 40eefa7..c621ed5 100644 --- a/sdks/common/pom.xml +++ b/sdks/common/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/common/runner-api/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/pom.xml b/sdks/common/runner-api/pom.xml index 8bc4123..f5536a7 100644 --- a/sdks/common/runner-api/pom.xml +++ b/sdks/common/runner-api/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-common-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 711da2a..039ecb0 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -367,12 +367,9 @@ message WriteFilesPayload { // (Required) The SdkFunctionSpec of the FileBasedSink. SdkFunctionSpec sink = 1; - // (Required) The format function. - SdkFunctionSpec format_function = 2; + bool windowed_writes = 2; - bool windowed_writes = 3; - - bool runner_determined_sharding = 4; + bool runner_determined_sharding = 3; } // A coder, the binary format for serialization and deserialization of data in @@ -436,14 +433,6 @@ message WindowingStrategy { // (Required) The duration, in milliseconds, beyond the end of a window at // which the window becomes droppable. int64 allowed_lateness = 8; - - // (Required) Indicate whether empty on-time panes should be omitted. - OnTimeBehavior OnTimeBehavior = 9; - - // (Required) Whether or not the window fn assigns inputs to exactly one window - // - // This knowledge is required for some optimizations - bool assigns_to_one_window = 10; } // Whether or not a PCollection's WindowFn is non-merging, merging, or @@ -489,17 +478,6 @@ enum ClosingBehavior { EMIT_IF_NONEMPTY = 1; } -// Controls whether or not an aggregating transform should output data -// when an on-time pane is empty. -enum OnTimeBehavior { - // Always fire the on-time pane. Even if there is no new data since - // the previous firing, an element will be produced. - FIRE_ALWAYS = 0; - - // Only fire the on-time pane if there is new data since the previous firing. - FIRE_IF_NONEMPTY = 1; -} - // When a number of windowed, timestamped inputs are aggregated, the timestamp // for the resulting output. enum OutputTime { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/build-tools/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/pom.xml b/sdks/java/build-tools/pom.xml index d7d25f6..5a2c498 100644 --- a/sdks/java/build-tools/pom.xml +++ b/sdks/java/build-tools/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../../../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 0c9080d..3430750 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -412,13 +412,4 @@ <!-- PTransforms do not actually support serialization. --> </Match> - <Match> - <Class name="org.apache.beam.sdk.options.ProxyInvocationHandler"/> - <Field name="~.*"/> - <Bug pattern="SE_BAD_FIELD"/> - <!-- - ProxyInvocationHandler implements Serializable only for the sake of throwing an informative - exception in writeObject() - --> - </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 3f12dc4..11b68e6 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java deleted file mode 100644 index a86b198..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.coders; - -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.values.ShardedKey; - -/** A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}. */ -@VisibleForTesting -public class ShardedKeyCoder<KeyT> extends StructuredCoder<ShardedKey<KeyT>> { - public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) { - return new ShardedKeyCoder<>(keyCoder); - } - - private final Coder<KeyT> keyCoder; - private final VarIntCoder shardNumberCoder; - - protected ShardedKeyCoder(Coder<KeyT> keyCoder) { - this.keyCoder = keyCoder; - this.shardNumberCoder = VarIntCoder.of(); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return Arrays.asList(keyCoder); - } - - @Override - public void encode(ShardedKey<KeyT> key, OutputStream outStream) - throws IOException { - keyCoder.encode(key.getKey(), outStream); - shardNumberCoder.encode(key.getShardNumber(), outStream); - } - - @Override - public ShardedKey<KeyT> decode(InputStream inStream) - throws IOException { - return ShardedKey.of(keyCoder.decode(inStream), shardNumberCoder.decode(inStream)); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - keyCoder.verifyDeterministic(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 89cadbd..4143db2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableMap; @@ -34,7 +35,6 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.io.fs.ResourceId; @@ -43,7 +43,6 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.PBegin; @@ -53,19 +52,18 @@ import org.apache.beam.sdk.values.PDone; /** * {@link PTransform}s for reading and writing Avro files. * - * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using - * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. See {@link - * FileSystems} for information on supported file systems and filepatterns. + * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, + * using {@link AvroIO.Read#from} to specify the filename or filepattern to read from. + * See {@link FileSystems} for information on supported file systems and filepatterns. * - * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read - * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a - * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a + * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. + * To read {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes + * a {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified * schema. * * <p>For example: - * - * <pre>{@code + * <pre> {@code * Pipeline p = ...; * * // A simple Read of a local file (only runs locally): @@ -77,33 +75,34 @@ import org.apache.beam.sdk.values.PDone; * PCollection<GenericRecord> records = * p.apply(AvroIO.readGenericRecords(schema) * .from("gs://my_bucket/path/to/records-*.avro")); - * }</pre> + * } </pre> * * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using - * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link - * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set - * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link - * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this - * default write filename policy using {@link Write#to(FileBasedSink.FilenamePolicy)} to specify a - * custom file naming policy. + * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default + * {@link DefaultFilenamePolicy} will use this prefix, in conjunction with a + * {@link ShardNameTemplate} (set via {@link Write#withShardNameTemplate(String)}) and optional + * filename suffix (set via {@link Write#withSuffix(String)}, to generate output filenames in a + * sharded way. You can override this default write filename policy using + * {@link Write#withFilenamePolicy(FileBasedSink.FilenamePolicy)} to specify a custom file naming + * policy. * * <p>By default, all input is put into the global window before writing. If per-window writes are - * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()} - * will cause windowing and triggering to be preserved. When producing windowed writes with a - * streaming runner that supports triggers, the number of output shards must be set explicitly using - * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen - * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set, - * and unique windows and triggers must produce unique filenames. + * desired - for example, when using a streaming runner - + * {@link AvroIO.Write#withWindowedWrites()} will cause windowing and triggering to be + * preserved. When producing windowed writes, the number of output shards must be set explicitly + * using {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a + * runner-chosen value, so you may need not set it yourself. A + * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce + * unique filenames. * - * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write - * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes - * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a - * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified - * schema. + * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. + * To write {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} + * which takes a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema + * in a JSON-encoded string form. An exception will be thrown if a record doesn't match the + * specified schema. * * <p>For example: - * - * <pre>{@code + * <pre> {@code * // A simple Write to a local file (only runs locally): * PCollection<AvroAutoGenClass> records = ...; * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro")); @@ -114,11 +113,11 @@ import org.apache.beam.sdk.values.PDone; * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema) * .to("gs://my_bucket/path/to/numbers") * .withSuffix(".avro")); - * }</pre> + * } </pre> * - * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link - * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or - * overridden using {@link AvroIO.Write#withCodec}. + * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the + * {@link org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can + * be changed or overridden using {@link AvroIO.Write#withCodec}. */ public class AvroIO { /** @@ -259,16 +258,11 @@ public class AvroIO { @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix(); @Nullable abstract String getShardTemplate(); @Nullable abstract String getFilenameSuffix(); - - @Nullable - abstract ValueProvider<ResourceId> getTempDirectory(); - abstract int getNumShards(); @Nullable abstract Class<T> getRecordClass(); @Nullable abstract Schema getSchema(); abstract boolean getWindowedWrites(); @Nullable abstract FilenamePolicy getFilenamePolicy(); - /** * The codec used to encode the blocks in the Avro file. String value drawn from those in * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html @@ -283,9 +277,6 @@ public class AvroIO { abstract static class Builder<T> { abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix); abstract Builder<T> setFilenameSuffix(String filenameSuffix); - - abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory); - abstract Builder<T> setNumShards(int numShards); abstract Builder<T> setShardTemplate(String shardTemplate); abstract Builder<T> setRecordClass(Class<T> recordClass); @@ -305,9 +296,9 @@ public class AvroIO { * <p>The name of the output files will be determined by the {@link FilenamePolicy} used. * * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the - * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and a - * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden - * using {@link #to(FilenamePolicy)}. + * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and + * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be + * overridden using {@link #withFilenamePolicy(FilenamePolicy)}. */ public Write<T> to(String outputPrefix) { return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix)); @@ -315,21 +306,14 @@ public class AvroIO { /** * Writes to file(s) with the given output prefix. See {@link FileSystems} for information on - * supported file systems. This prefix is used by the {@link DefaultFilenamePolicy} to generate - * filenames. - * - * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the - * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and a - * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden - * using {@link #to(FilenamePolicy)}. + * supported file systems. * - * <p>This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case - * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set. - * Custom filename policies do not automatically see this prefix - you should explicitly pass - * the prefix into your {@link FilenamePolicy} object if you need this. + * <p>The name of the output files will be determined by the {@link FilenamePolicy} used. * - * <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to - * infer a directory for temporary files. + * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the + * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and + * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be + * overridden using {@link #withFilenamePolicy(FilenamePolicy)}. */ @Experimental(Kind.FILESYSTEM) public Write<T> to(ResourceId outputPrefix) { @@ -358,22 +342,15 @@ public class AvroIO { } /** - * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A - * directory for temporary files must be specified using {@link #withTempDirectory}. + * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files. */ - public Write<T> to(FilenamePolicy filenamePolicy) { + public Write<T> withFilenamePolicy(FilenamePolicy filenamePolicy) { return toBuilder().setFilenamePolicy(filenamePolicy).build(); } - /** Set the base directory used to generate temporary files. */ - @Experimental(Kind.FILESYSTEM) - public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) { - return toBuilder().setTempDirectory(tempDirectory).build(); - } - /** * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be - * used when using one of the default filename-prefix to() overrides. + * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured. * * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. @@ -383,8 +360,8 @@ public class AvroIO { } /** - * Configures the filename suffix for written files. This option may only be used when using one - * of the default filename-prefix to() overrides. + * Configures the filename suffix for written files. This option may only be used when + * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured. * * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. @@ -425,8 +402,9 @@ public class AvroIO { /** * Preserves windowing of input elements and writes them to files based on the element's window. * - * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using - * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}. + * <p>Requires use of {@link #withFilenamePolicy(FileBasedSink.FilenamePolicy)}. Filenames will + * be generated using {@link FilenamePolicy#windowedFilename}. See also + * {@link WriteFiles#withWindowedWrites()}. */ public Write<T> withWindowedWrites() { return toBuilder().setWindowedWrites(true).build(); @@ -457,46 +435,32 @@ public class AvroIO { return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build(); } - DynamicDestinations<T, Void> resolveDynamicDestinations() { - FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); - if (usedFilenamePolicy == null) { - usedFilenamePolicy = - DefaultFilenamePolicy.fromStandardParameters( - getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites()); - } - return DynamicFileDestinations.constant(usedFilenamePolicy); - } - @Override public PDone expand(PCollection<T> input) { - checkArgument( - getFilenamePrefix() != null || getTempDirectory() != null, - "Need to set either the filename prefix or the tempDirectory of a AvroIO.Write " - + "transform."); - if (getFilenamePolicy() != null) { - checkArgument( - getShardTemplate() == null && getFilenameSuffix() == null, - "shardTemplate and filenameSuffix should only be used with the default " - + "filename policy"); - } - return expandTyped(input, resolveDynamicDestinations()); - } + checkState(getFilenamePrefix() != null, + "Need to set the filename prefix of an AvroIO.Write transform."); + checkState( + (getFilenamePolicy() == null) + || (getShardTemplate() == null && getFilenameSuffix() == null), + "Cannot set a filename policy and also a filename template or suffix."); + checkState(getSchema() != null, + "Need to set the schema of an AvroIO.Write transform."); + checkState(!getWindowedWrites() || (getFilenamePolicy() != null), + "When using windowed writes, a filename policy must be set via withFilenamePolicy()."); - public <DestinationT> PDone expandTyped( - PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) { - ValueProvider<ResourceId> tempDirectory = getTempDirectory(); - if (tempDirectory == null) { - tempDirectory = getFilenamePrefix(); + FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); + if (usedFilenamePolicy == null) { + usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters( + getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites()); } - WriteFiles<T, DestinationT, T> write = - WriteFiles.to( - new AvroSink<>( - tempDirectory, - dynamicDestinations, - AvroCoder.of(getRecordClass(), getSchema()), - getCodec(), - getMetadata()), - SerializableFunctions.<T>identity()); + + WriteFiles<T> write = WriteFiles.to( + new AvroSink<>( + getFilenamePrefix(), + usedFilenamePolicy, + AvroCoder.of(getRecordClass(), getSchema()), + getCodec(), + getMetadata())); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -509,25 +473,31 @@ public class AvroIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - resolveDynamicDestinations().populateDisplayData(builder); - - String tempDirectory = null; - if (getTempDirectory() != null) { - tempDirectory = - getTempDirectory().isAccessible() - ? getTempDirectory().get().toString() - : getTempDirectory().toString(); + checkState( + getFilenamePrefix() != null, + "Unable to populate DisplayData for invalid AvroIO.Write (unset output prefix)."); + String outputPrefixString = null; + if (getFilenamePrefix().isAccessible()) { + ResourceId dir = getFilenamePrefix().get(); + outputPrefixString = dir.toString(); + } else { + outputPrefixString = getFilenamePrefix().toString(); } builder - .add(DisplayData.item("schema", getRecordClass()).withLabel("Record Schema")) - .addIfNotDefault( - DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0) - .addIfNotDefault( - DisplayData.item("codec", getCodec().toString()).withLabel("Avro Compression Codec"), - DEFAULT_CODEC.toString()) - .addIfNotNull( - DisplayData.item("tempDirectory", tempDirectory) - .withLabel("Directory for temporary files")); + .add(DisplayData.item("schema", getRecordClass()) + .withLabel("Record Schema")) + .addIfNotNull(DisplayData.item("filePrefix", outputPrefixString) + .withLabel("Output File Prefix")) + .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate()) + .withLabel("Output Shard Name Template")) + .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix()) + .withLabel("Output File Suffix")) + .addIfNotDefault(DisplayData.item("numShards", getNumShards()) + .withLabel("Maximum Output Shards"), + 0) + .addIfNotDefault(DisplayData.item("codec", getCodec().toString()) + .withLabel("Avro Compression Codec"), + DEFAULT_CODEC.toString()); builder.include("Metadata", new Metadata()); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java index c78870b..6c36266 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java @@ -32,40 +32,39 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.util.MimeTypes; /** A {@link FileBasedSink} for Avro files. */ -class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> { +class AvroSink<T> extends FileBasedSink<T> { private final AvroCoder<T> coder; private final SerializableAvroCodecFactory codec; private final ImmutableMap<String, Object> metadata; AvroSink( ValueProvider<ResourceId> outputPrefix, - DynamicDestinations<T, DestinationT> dynamicDestinations, + FilenamePolicy filenamePolicy, AvroCoder<T> coder, SerializableAvroCodecFactory codec, ImmutableMap<String, Object> metadata) { // Avro handle compression internally using the codec. - super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED); + super(outputPrefix, filenamePolicy, CompressionType.UNCOMPRESSED); this.coder = coder; this.codec = codec; this.metadata = metadata; } @Override - public WriteOperation<T, DestinationT> createWriteOperation() { + public WriteOperation<T> createWriteOperation() { return new AvroWriteOperation<>(this, coder, codec, metadata); } /** A {@link WriteOperation WriteOperation} for Avro files. */ - private static class AvroWriteOperation<T, DestinationT> extends WriteOperation<T, DestinationT> { + private static class AvroWriteOperation<T> extends WriteOperation<T> { private final AvroCoder<T> coder; private final SerializableAvroCodecFactory codec; private final ImmutableMap<String, Object> metadata; - private AvroWriteOperation( - AvroSink<T, DestinationT> sink, - AvroCoder<T> coder, - SerializableAvroCodecFactory codec, - ImmutableMap<String, Object> metadata) { + private AvroWriteOperation(AvroSink<T> sink, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { super(sink); this.coder = coder; this.codec = codec; @@ -73,23 +72,22 @@ class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> { } @Override - public Writer<T, DestinationT> createWriter() throws Exception { + public Writer<T> createWriter() throws Exception { return new AvroWriter<>(this, coder, codec, metadata); } } /** A {@link Writer Writer} for Avro files. */ - private static class AvroWriter<T, DestinationT> extends Writer<T, DestinationT> { + private static class AvroWriter<T> extends Writer<T> { private final AvroCoder<T> coder; private DataFileWriter<T> dataFileWriter; private SerializableAvroCodecFactory codec; private final ImmutableMap<String, Object> metadata; - public AvroWriter( - WriteOperation<T, DestinationT> writeOperation, - AvroCoder<T> coder, - SerializableAvroCodecFactory codec, - ImmutableMap<String, Object> metadata) { + public AvroWriter(WriteOperation<T> writeOperation, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { super(writeOperation, MimeTypes.BINARY); this.coder = coder; this.codec = codec; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index 4baac36..6ab8dec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -96,6 +96,12 @@ public class CompressedSource<T> extends FileBasedSource<T> { */ ReadableByteChannel createDecompressingChannel(String fileName, ReadableByteChannel channel) throws IOException; + + /** + * Given a file name, returns true if the file name matches any supported compression + * scheme. + */ + boolean isCompressed(String fileName); } /** @@ -236,16 +242,6 @@ public class CompressedSource<T> extends FileBasedSource<T> { @Override public abstract ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) throws IOException; - - /** Returns whether the file's extension matches of one of the known compression formats. */ - public static boolean isCompressed(String filename) { - for (CompressionMode type : CompressionMode.values()) { - if (type.matches(filename)) { - return true; - } - } - return false; - } } /** @@ -277,6 +273,16 @@ public class CompressedSource<T> extends FileBasedSource<T> { ReadableByteChannel.class.getSimpleName(), ReadableByteChannel.class.getSimpleName())); } + + @Override + public boolean isCompressed(String fileName) { + for (CompressionMode type : CompressionMode.values()) { + if (type.matches(fileName)) { + return true; + } + } + return false; + } } private final FileBasedSource<T> sourceDelegate; @@ -360,9 +366,13 @@ public class CompressedSource<T> extends FileBasedSource<T> { */ @Override protected final boolean isSplittable() throws Exception { - return channelFactory instanceof FileNameBasedDecompressingChannelFactory - && !CompressionMode.isCompressed(getFileOrPatternSpec()) - && sourceDelegate.isSplittable(); + if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) { + FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory = + (FileNameBasedDecompressingChannelFactory) channelFactory; + return !fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec()) + && sourceDelegate.isSplittable(); + } + return false; } /** @@ -376,7 +386,9 @@ public class CompressedSource<T> extends FileBasedSource<T> { @Override protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) { if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) { - if (!CompressionMode.isCompressed(getFileOrPatternSpec())) { + FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory = + (FileNameBasedDecompressingChannelFactory) channelFactory; + if (!fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec())) { return sourceDelegate.createSingleFileReader(options); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index 7a60e49..f9e4ac4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -20,31 +20,25 @@ package org.apache.beam.sdk.io; import static com.google.common.base.MoreObjects.firstNonNull; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; import java.text.DecimalFormat; import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; -import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A default {@link FilenamePolicy} for windowed and unwindowed files. This policy is constructed @@ -57,7 +51,10 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; * {@code WriteOneFilePerWindow} example pipeline. */ public final class DefaultFilenamePolicy extends FilenamePolicy { - /** The default sharding name template. */ + + private static final Logger LOG = LoggerFactory.getLogger(DefaultFilenamePolicy.class); + + /** The default sharding name template used in {@link #constructUsingStandardParameters}. */ public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; /** The default windowed sharding name template used when writing windowed files. @@ -70,184 +67,75 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { "W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE; /* - * pattern for both windowed and non-windowed file names. + * pattern for both windowed and non-windowed file names */ private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+|W|P)"); /** - * Encapsulates constructor parameters to {@link DefaultFilenamePolicy}. - * - * <p>This is used as the {@code DestinationT} argument to allow {@link DefaultFilenamePolicy} - * objects to be dynamically generated. - */ - public static class Params implements Serializable { - private final ValueProvider<ResourceId> baseFilename; - private final String shardTemplate; - private final boolean explicitTemplate; - private final String suffix; - - /** - * Construct a default Params object. The shard template will be set to the default {@link - * #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} value. - */ - public Params() { - this.baseFilename = null; - this.shardTemplate = DEFAULT_UNWINDOWED_SHARD_TEMPLATE; - this.suffix = ""; - this.explicitTemplate = false; - } - - private Params( - ValueProvider<ResourceId> baseFilename, - String shardTemplate, - String suffix, - boolean explicitTemplate) { - this.baseFilename = baseFilename; - this.shardTemplate = shardTemplate; - this.suffix = suffix; - this.explicitTemplate = explicitTemplate; - } - - /** - * Specify that writes are windowed. This affects the default shard template, changing it to - * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE}. - */ - public Params withWindowedWrites() { - String template = this.shardTemplate; - if (!explicitTemplate) { - template = DEFAULT_WINDOWED_SHARD_TEMPLATE; - } - return new Params(baseFilename, template, suffix, explicitTemplate); - } - - /** Sets the base filename. */ - public Params withBaseFilename(ResourceId baseFilename) { - return withBaseFilename(StaticValueProvider.of(baseFilename)); - } - - /** Like {@link #withBaseFilename(ResourceId)}, but takes in a {@link ValueProvider}. */ - public Params withBaseFilename(ValueProvider<ResourceId> baseFilename) { - return new Params(baseFilename, shardTemplate, suffix, explicitTemplate); - } - - /** Sets the shard template. */ - public Params withShardTemplate(String shardTemplate) { - return new Params(baseFilename, shardTemplate, suffix, true); - } - - /** Sets the suffix. */ - public Params withSuffix(String suffix) { - return new Params(baseFilename, shardTemplate, suffix, explicitTemplate); - } - } - - /** A Coder for {@link Params}. */ - public static class ParamsCoder extends AtomicCoder<Params> { - private static final ParamsCoder INSTANCE = new ParamsCoder(); - private Coder<String> stringCoder = StringUtf8Coder.of(); - - public static ParamsCoder of() { - return INSTANCE; - } - - @Override - public void encode(Params value, OutputStream outStream) throws IOException { - if (value == null) { - throw new CoderException("cannot encode a null value"); - } - stringCoder.encode(value.baseFilename.get().toString(), outStream); - stringCoder.encode(value.shardTemplate, outStream); - stringCoder.encode(value.suffix, outStream); - } - - @Override - public Params decode(InputStream inStream) throws IOException { - ResourceId prefix = - FileBasedSink.convertToFileResourceIfPossible(stringCoder.decode(inStream)); - String shardTemplate = stringCoder.decode(inStream); - String suffix = stringCoder.decode(inStream); - return new Params() - .withBaseFilename(prefix) - .withShardTemplate(shardTemplate) - .withSuffix(suffix); - } - } - - private final Params params; - /** * Constructs a new {@link DefaultFilenamePolicy}. * * @see DefaultFilenamePolicy for more information on the arguments to this function. */ @VisibleForTesting - DefaultFilenamePolicy(Params params) { - this.params = params; + DefaultFilenamePolicy(ValueProvider<String> prefix, String shardTemplate, String suffix) { + this.prefix = prefix; + this.shardTemplate = shardTemplate; + this.suffix = suffix; } /** - * Construct a {@link DefaultFilenamePolicy}. + * A helper function to construct a {@link DefaultFilenamePolicy} using the standard filename + * parameters, namely a provided {@link ResourceId} for the output prefix, and possibly-null + * shard name template and suffix. * - * <p>This is a shortcut for: + * <p>Any filename component of the provided resource will be used as the filename prefix. * - * <pre>{@code - * DefaultFilenamePolicy.fromParams(new Params() - * .withBaseFilename(baseFilename) - * .withShardTemplate(shardTemplate) - * .withSuffix(filenameSuffix) - * .withWindowedWrites()) - * }</pre> + * <p>If provided, the shard name template will be used; otherwise + * {@link #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} will be used for non-windowed file names and + * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will be used for windowed file names. * - * <p>Where the respective {@code with} methods are invoked only if the value is non-null or true. + * <p>If provided, the suffix will be used; otherwise the files will have an empty suffix. */ - public static DefaultFilenamePolicy fromStandardParameters( - ValueProvider<ResourceId> baseFilename, + public static DefaultFilenamePolicy constructUsingStandardParameters( + ValueProvider<ResourceId> outputPrefix, @Nullable String shardTemplate, @Nullable String filenameSuffix, boolean windowedWrites) { - Params params = new Params().withBaseFilename(baseFilename); - if (shardTemplate != null) { - params = params.withShardTemplate(shardTemplate); - } - if (filenameSuffix != null) { - params = params.withSuffix(filenameSuffix); - } - if (windowedWrites) { - params = params.withWindowedWrites(); - } - return fromParams(params); + // Pick the appropriate default policy based on whether windowed writes are being performed. + String defaultTemplate = + windowedWrites ? DEFAULT_WINDOWED_SHARD_TEMPLATE : DEFAULT_UNWINDOWED_SHARD_TEMPLATE; + return new DefaultFilenamePolicy( + NestedValueProvider.of(outputPrefix, new ExtractFilename()), + firstNonNull(shardTemplate, defaultTemplate), + firstNonNull(filenameSuffix, "")); } - /** Construct a {@link DefaultFilenamePolicy} from a {@link Params} object. */ - public static DefaultFilenamePolicy fromParams(Params params) { - return new DefaultFilenamePolicy(params); - } + private final ValueProvider<String> prefix; + private final String shardTemplate; + private final String suffix; /** * Constructs a fully qualified name from components. * - * <p>The name is built from a base filename, shard template (with shard numbers applied), and a - * suffix. All components are required, but may be empty strings. + * <p>The name is built from a prefix, shard template (with shard numbers + * applied), and a suffix. All components are required, but may be empty + * strings. * - * <p>Within a shard template, repeating sequences of the letters "S" or "N" are replaced with the - * shard number, or number of shards respectively. "P" is replaced with by stringification of - * current pane. "W" is replaced by stringification of current window. + * <p>Within a shard template, repeating sequences of the letters "S" or "N" + * are replaced with the shard number, or number of shards respectively. + * "P" is replaced with by stringification of current pane. + * "W" is replaced by stringification of current window. * - * <p>The numbers are formatted with leading zeros to match the length of the repeated sequence of - * letters. + * <p>The numbers are formatted with leading zeros to match the length of the + * repeated sequence of letters. * - * <p>For example, if baseFilename = "path/to/output", shardTemplate = "-SSS-of-NNN", and suffix = - * ".txt", with shardNum = 1 and numShards = 100, the following is produced: - * "path/to/output-001-of-100.txt". + * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and + * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is + * produced: "output-001-of-100.txt". */ - static ResourceId constructName( - ResourceId baseFilename, - String shardTemplate, - String suffix, - int shardNum, - int numShards, - String paneStr, - String windowStr) { - String prefix = extractFilename(baseFilename); + static String constructName( + String prefix, String shardTemplate, String suffix, int shardNum, int numShards, + String paneStr, String windowStr) { // Matcher API works with StringBuffer, rather than StringBuilder. StringBuffer sb = new StringBuffer(); sb.append(prefix); @@ -277,37 +165,27 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { m.appendTail(sb); sb.append(suffix); - return baseFilename - .getCurrentDirectory() - .resolve(sb.toString(), StandardResolveOptions.RESOLVE_FILE); + return sb.toString(); } @Override @Nullable - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { - return constructName( - params.baseFilename.get(), - params.shardTemplate, - params.suffix + outputFileHints.getSuggestedFilenameSuffix(), - context.getShardNumber(), - context.getNumShards(), - null, - null); + public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, + String extension) { + String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(), + context.getNumShards(), null, null) + extension; + return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { + public ResourceId windowedFilename(ResourceId outputDirectory, + WindowedContext context, String extension) { final PaneInfo paneInfo = context.getPaneInfo(); String paneStr = paneInfoToString(paneInfo); String windowStr = windowToString(context.getWindow()); - return constructName( - params.baseFilename.get(), - params.shardTemplate, - params.suffix + outputFileHints.getSuggestedFilenameSuffix(), - context.getShardNumber(), - context.getNumShards(), - paneStr, - windowStr); + String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(), + context.getNumShards(), paneStr, windowStr) + extension; + return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); } /* @@ -338,32 +216,24 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { @Override public void populateDisplayData(DisplayData.Builder builder) { String filenamePattern; - if (params.baseFilename.isAccessible()) { - filenamePattern = - String.format("%s%s%s", params.baseFilename.get(), params.shardTemplate, params.suffix); + if (prefix.isAccessible()) { + filenamePattern = String.format("%s%s%s", prefix.get(), shardTemplate, suffix); } else { - filenamePattern = - String.format("%s%s%s", params.baseFilename, params.shardTemplate, params.suffix); + filenamePattern = String.format("%s%s%s", prefix, shardTemplate, suffix); } - - String outputPrefixString = null; - outputPrefixString = - params.baseFilename.isAccessible() - ? params.baseFilename.get().toString() - : params.baseFilename.toString(); - builder.add(DisplayData.item("filenamePattern", filenamePattern).withLabel("Filename Pattern")); - builder.add(DisplayData.item("filePrefix", outputPrefixString).withLabel("Output File Prefix")); - builder.add(DisplayData.item("fileSuffix", params.suffix).withLabel("Output file Suffix")); builder.add( - DisplayData.item("shardNameTemplate", params.shardTemplate) - .withLabel("Output Shard Name Template")); + DisplayData.item("filenamePattern", filenamePattern) + .withLabel("Filename Pattern")); } - private static String extractFilename(ResourceId input) { - if (input.isDirectory()) { - return ""; - } else { - return firstNonNull(input.getFilename(), ""); + private static class ExtractFilename implements SerializableFunction<ResourceId, String> { + @Override + public String apply(ResourceId input) { + if (input.isDirectory()) { + return ""; + } else { + return firstNonNull(input.getFilename(), ""); + } } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java deleted file mode 100644 index e7ef0f6..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io; - -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; -import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder; -import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; -import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; - -/** Some helper classes that derive from {@link FileBasedSink.DynamicDestinations}. */ -public class DynamicFileDestinations { - /** Always returns a constant {@link FilenamePolicy}. */ - private static class ConstantFilenamePolicy<T> extends DynamicDestinations<T, Void> { - private final FilenamePolicy filenamePolicy; - - public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) { - this.filenamePolicy = filenamePolicy; - } - - @Override - public Void getDestination(T element) { - return (Void) null; - } - - @Override - public Coder<Void> getDestinationCoder() { - return null; - } - - @Override - public Void getDefaultDestination() { - return (Void) null; - } - - @Override - public FilenamePolicy getFilenamePolicy(Void destination) { - return filenamePolicy; - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - filenamePolicy.populateDisplayData(builder); - } - } - - /** - * A base class for a {@link DynamicDestinations} object that returns differently-configured - * instances of {@link DefaultFilenamePolicy}. - */ - private static class DefaultPolicyDestinations<UserT> extends DynamicDestinations<UserT, Params> { - SerializableFunction<UserT, Params> destinationFunction; - Params emptyDestination; - - public DefaultPolicyDestinations( - SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) { - this.destinationFunction = destinationFunction; - this.emptyDestination = emptyDestination; - } - - @Override - public Params getDestination(UserT element) { - return destinationFunction.apply(element); - } - - @Override - public Params getDefaultDestination() { - return emptyDestination; - } - - @Nullable - @Override - public Coder<Params> getDestinationCoder() { - return ParamsCoder.of(); - } - - @Override - public FilenamePolicy getFilenamePolicy(DefaultFilenamePolicy.Params params) { - return DefaultFilenamePolicy.fromParams(params); - } - } - - /** Returns a {@link DynamicDestinations} that always returns the same {@link FilenamePolicy}. */ - public static <T> DynamicDestinations<T, Void> constant(FilenamePolicy filenamePolicy) { - return new ConstantFilenamePolicy<>(filenamePolicy); - } - - /** - * Returns a {@link DynamicDestinations} that returns instances of {@link DefaultFilenamePolicy} - * configured with the given {@link Params}. - */ - public static <UserT> DynamicDestinations<UserT, Params> toDefaultPolicies( - SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) { - return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination); - } -}
