Add a Local Java Core Module This contains utilities shared by runners which execute locally. This is expected to be the DirectRunner and the ReferenceRunner, both of which can utilize shared in-memory representations about the state of an executing Pipeline.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4babcbf4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4babcbf4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4babcbf4 Branch: refs/heads/master Commit: 4babcbf46e0a4bada90a77f908eeaa7870138ea9 Parents: 50e58f0 Author: Thomas Groh <tg...@google.com> Authored: Wed Oct 25 16:31:09 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri Nov 17 17:23:41 2017 -0800 ---------------------------------------------------------------------- pom.xml | 6 ++ runners/direct-java/pom.xml | 5 ++ .../beam/runners/direct/CommittedBundle.java | 9 ++- .../direct/ImmutableListBundleFactory.java | 8 +++ .../beam/runners/direct/WatermarkManager.java | 4 +- .../direct/ImmutableListBundleFactoryTest.java | 4 +- runners/local-java/pom.xml | 62 ++++++++++++++++++++ .../apache/beam/runners/core/local/Bundle.java | 33 +++++++++++ .../beam/runners/core/local/package-info.java | 22 +++++++ runners/pom.xml | 1 + 10 files changed, 145 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7bf9d66..c5223df 100644 --- a/pom.xml +++ b/pom.xml @@ -702,6 +702,12 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-local-java-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 752af44..ff7b4a8 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -182,6 +182,11 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-local-java-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java index 79a96fe..ccb031f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.direct; import javax.annotation.Nullable; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.local.Bundle; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue; @@ -33,7 +34,7 @@ import org.joda.time.Instant; * a part of at a later point. * @param <T> the type of elements contained within this bundle */ -interface CommittedBundle<T> { +interface CommittedBundle<T> extends Bundle<T> { /** * Returns the PCollection that the elements of this bundle belong to. */ @@ -52,10 +53,8 @@ interface CommittedBundle<T> { */ Iterable<WindowedValue<T>> getElements(); - /** - * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}. - */ - Instant getMinTimestamp(); + @Override + Instant getMinimumTimestamp(); /** * Returns the processing time output watermark at the time the producing {@link PTransform} http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java index 73734d0..bea7699 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java @@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableList; +import java.util.Iterator; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -128,6 +130,12 @@ class ImmutableListBundleFactory implements BundleFactory { } @Override + @Nonnull + public Iterator<WindowedValue<T>> iterator() { + return getElements().iterator(); + } + + @Override public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements) { return create( getPCollection(), http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 599b74f..a1395a9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -274,7 +274,7 @@ class WatermarkManager { if (!pendingElements.isEmpty()) { minInputWatermark = INSTANT_ORDERING.min( - minInputWatermark, pendingElements.firstEntry().getElement().getMinTimestamp()); + minInputWatermark, pendingElements.firstEntry().getElement().getMinimumTimestamp()); } Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark); currentWatermark.set(newWatermark); @@ -1511,7 +1511,7 @@ class WatermarkManager { @Override public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) { return ComparisonChain.start() - .compare(o1.getMinTimestamp(), o2.getMinTimestamp()) + .compare(o1.getMinimumTimestamp(), o2.getMinimumTimestamp()) .result(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java index 4a392db..83426c6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java @@ -120,7 +120,7 @@ public class ImmutableListBundleFactoryTest { // Sanity check that the test is meaningful. assertThat(minElementTs, not(equalTo(commitTime))); - assertThat(committed.getMinTimestamp(), equalTo(minElementTs)); + assertThat(committed.getMinimumTimestamp(), equalTo(minElementTs)); assertThat(committed.getSynchronizedProcessingOutputWatermark(), equalTo(commitTime)); return committed; @@ -190,7 +190,7 @@ public class ImmutableListBundleFactoryTest { assertThat( withed.getSynchronizedProcessingOutputWatermark(), equalTo(committed.getSynchronizedProcessingOutputWatermark())); - assertThat(withed.getMinTimestamp(), equalTo(new Instant(2048L))); + assertThat(withed.getMinimumTimestamp(), equalTo(new Instant(2048L))); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/local-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/local-java/pom.xml b/runners/local-java/pom.xml new file mode 100644 index 0000000..b988fd9 --- /dev/null +++ b/runners/local-java/pom.xml @@ -0,0 +1,62 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-parent</artifactId> + <version>2.3.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-runners-local-java-core</artifactId> + + <name>Apache Beam :: Runners :: Local Java Core</name> + + <packaging>jar</packaging> + + <dependencies> + <!-- The Local Java Core artifact should carefully manage the classes made available by this + dependency. The Java SDK should be used to provide common utilities (e.g. Coder, WindowedValue) + but should not be used within this library to execute any UDFs. + TODO: Add an APISurfaceTest to force this to be the case, if possible. --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <!-- build dependencies --> + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java ---------------------------------------------------------------------- diff --git a/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java new file mode 100644 index 0000000..98e1e8a --- /dev/null +++ b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.local; + +import org.apache.beam.sdk.util.WindowedValue; +import org.joda.time.Instant; + +/** An immutable collection of elements which are part of a {@code PCollection}. */ +public interface Bundle<T> extends Iterable<WindowedValue<T>> { + /** + * Return the minimum timestamp among elements in this bundle. + * + * <p>This should be equivalent to iterating over all of the elements within a bundle and + * selecting the minimum timestamp from among them. + */ + Instant getMinimumTimestamp(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java ---------------------------------------------------------------------- diff --git a/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java new file mode 100644 index 0000000..ffd2e54 --- /dev/null +++ b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utilities useful when executing a pipeline on a single machine. + */ +package org.apache.beam.runners.core.local; http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/pom.xml ---------------------------------------------------------------------- diff --git a/runners/pom.xml b/runners/pom.xml index 47f3c0e..814b3f1 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -36,6 +36,7 @@ <module>core-construction-java</module> <module>core-java</module> <module>java-fn-execution</module> + <module>local-java</module> <module>local-artifact-service-java</module> <module>reference</module> <module>direct-java</module>