Repository: incubator-beam Updated Branches: refs/heads/master d71d828b7 -> c4036753f
[BEAM-634] Be able to import Beam codebase in Eclipse and support m2e Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0ae04be Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0ae04be Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0ae04be Branch: refs/heads/master Commit: a0ae04bef40149cdf54d0ab50909f18a444f3023 Parents: d71d828 Author: Daniel Kulp <dk...@apache.org> Authored: Mon Sep 12 17:31:22 2016 -0400 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Thu Sep 15 16:26:05 2016 +0200 ---------------------------------------------------------------------- pom.xml | 35 ++++++++++++++++++-- runners/direct-java/pom.xml | 8 ++++- .../direct/BoundedReadEvaluatorFactory.java | 9 ++--- .../direct/ParDoMultiEvaluatorFactory.java | 12 +++---- .../direct/ParDoSingleEvaluatorFactory.java | 9 ++--- .../direct/UnboundedReadEvaluatorFactory.java | 9 ++--- .../apache/beam/runners/flink/package-info.java | 22 ------------ .../src/main/resources/beam/checkstyle.xml | 3 ++ .../src/main/resources/beam/suppressions.xml | 5 +++ .../beam/sdk/coders/IterableLikeCoder.java | 22 ++++++------ .../beam/sdk/coders/protobuf/ProtoCoder.java | 4 ++- .../beam/sdk/util/MergingActiveWindowSet.java | 12 ++++--- .../org/apache/beam/sdk/util/PubsubClient.java | 2 +- .../beam/sdk/util/PubsubJsonClientTest.java | 16 ++++----- .../apache/beam/sdk/io/kinesis/KinesisIO.java | 4 ++- .../beam/sdk/io/kinesis/KinesisUploader.java | 3 +- .../beam/sdk/io/kinesis/package-info.java | 22 ------------ .../beam/sdk/io/mongodb/package-info.java | 22 ------------ 18 files changed, 102 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 01b5a88..fb3a8a4 100644 --- a/pom.xml +++ b/pom.xml @@ -130,6 +130,8 @@ <stax2.version>3.1.4</stax2.version> <storage.version>v1-rev71-1.22.0</storage.version> <woodstox.version>4.4.1</woodstox.version> + + <compiler.error.flag>-Werror</compiler.error.flag> </properties> <packaging>pom</packaging> @@ -225,6 +227,33 @@ </properties> </profile> + <profile> + <id>eclipse-jdt</id> + <properties> + <!-- Tycho doesn't support -Werror --> + <compiler.error.flag>-Xlint:all</compiler.error.flag> + </properties> + <build> + <pluginManagement> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <compilerId>jdt</compilerId> + </configuration> + <dependencies> + <dependency> + <groupId>org.eclipse.tycho</groupId> + <artifactId>tycho-compiler-jdt</artifactId> + <!-- 0.24.0 is last version to support Java7 --> + <version>0.24.0</version> + </dependency> + </dependencies> + </plugin> + </plugins> + </pluginManagement> + </build> + </profile> </profiles> <dependencyManagement> @@ -681,7 +710,7 @@ <dependency> <groupId>com.puppycrawl.tools</groupId> <artifactId>checkstyle</artifactId> - <version>6.17</version> + <version>6.19</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> @@ -716,13 +745,13 @@ <plugin> <artifactId>maven-compiler-plugin</artifactId> - <version>3.1</version> + <version>3.3</version> <configuration> <source>1.7</source> <target>1.7</target> <compilerArgs> <arg>-Xlint:all</arg> - <arg>-Werror</arg> + <arg>${compiler.error.flag}</arg> <!-- Override options warnings to support cross-compilation --> <arg>-Xlint:-options</arg> <!-- Temporary lint overrides, to be removed over time. --> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index e06883f..354c8c7 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -361,6 +361,12 @@ </exclusions> <scope>test</scope> </dependency> - + + <!-- needed for eclipse-jdt generated core as the test-jar references classes from this --> + <dependency> + <groupId>com.google.cloud.dataflow</groupId> + <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 2046d31..9c77946 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -73,7 +73,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { * already done so. */ private <OutputT> TransformEvaluator<?> getTransformEvaluator( - final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform) { + final AppliedPTransform<?, PCollection<OutputT>, ?> transform) { // Key by the application and the context the evaluation is occurring in (which call to // Pipeline#run). Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue = @@ -83,7 +83,8 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) { // If no queue existed in the evaluators, add an evaluator to initialize the evaluator // factory for this transform - BoundedSource<OutputT> source = transform.getTransform().getSource(); + Bounded<OutputT> bound = (Bounded<OutputT>) transform.getTransform(); + BoundedSource<OutputT> source = bound.getSource(); BoundedReadEvaluator<OutputT> evaluator = new BoundedReadEvaluator<OutputT>(transform, evaluationContext, source); evaluatorQueue.offer(evaluator); @@ -105,7 +106,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { * may produce duplicate elements. */ private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> { - private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform; + private final AppliedPTransform<?, PCollection<OutputT>, ?> transform; private final EvaluationContext evaluationContext; /** * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same as @@ -114,7 +115,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { private BoundedSource<OutputT> source; public BoundedReadEvaluator( - AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, + AppliedPTransform<?, PCollection<OutputT>, ?> transform, EvaluationContext evaluationContext, BoundedSource<OutputT> source) { this.transform = transform; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java index fcb68c4..d909e8b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java @@ -1,5 +1,5 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one +* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -39,18 +39,18 @@ import org.slf4j.LoggerFactory; */ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ParDoMultiEvaluatorFactory.class); - private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, DoFnLifecycleManager> - fnClones; + private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones; private final EvaluationContext evaluationContext; public ParDoMultiEvaluatorFactory(EvaluationContext evaluationContext) { this.evaluationContext = evaluationContext; fnClones = CacheBuilder.newBuilder() - .build(new CacheLoader<AppliedPTransform<?, ?, BoundMulti<?, ?>>, DoFnLifecycleManager>() { + .build(new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() { @Override - public DoFnLifecycleManager load(AppliedPTransform<?, ?, BoundMulti<?, ?>> key) + public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key) throws Exception { - return DoFnLifecycleManager.of(key.getTransform().getFn()); + BoundMulti<?, ?> bound = (BoundMulti<?, ?>) key.getTransform(); + return DoFnLifecycleManager.of(bound.getFn()); } }); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java index 91da35f..1a06ea6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; */ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ParDoSingleEvaluatorFactory.class); - private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, DoFnLifecycleManager> fnClones; + private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones; private final EvaluationContext evaluationContext; public ParDoSingleEvaluatorFactory(EvaluationContext evaluationContext) { @@ -47,11 +47,12 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { fnClones = CacheBuilder.newBuilder() .build( - new CacheLoader<AppliedPTransform<?, ?, Bound<?, ?>>, DoFnLifecycleManager>() { + new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() { @Override - public DoFnLifecycleManager load(AppliedPTransform<?, ?, Bound<?, ?>> key) + public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key) throws Exception { - return DoFnLifecycleManager.of(key.getTransform().getFn()); + Bound<?, ?> bound = (Bound<?, ?>) key.getTransform(); + return DoFnLifecycleManager.of(bound.getFn()); } }); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 0dfcd69..9fb3dbf 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -82,7 +82,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { */ private <OutputT, CheckpointMarkT extends CheckpointMark> TransformEvaluator<?> getTransformEvaluator( - final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform) { + final AppliedPTransform<?, PCollection<OutputT>, ?> transform) { ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue = (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>) sourceEvaluators.get(transform); @@ -91,8 +91,9 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) { // If no queue existed in the evaluators, add an evaluator to initialize the evaluator // factory for this transform + Unbounded<OutputT> unbounded = (Unbounded<OutputT>) transform.getTransform(); UnboundedSource<OutputT, CheckpointMarkT> source = - (UnboundedSource<OutputT, CheckpointMarkT>) transform.getTransform().getSource(); + (UnboundedSource<OutputT, CheckpointMarkT>) unbounded.getSource(); UnboundedReadDeduplicator deduplicator; if (source.requiresDeduping()) { deduplicator = UnboundedReadDeduplicator.CachedIdDeduplicator.create(); @@ -130,7 +131,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { implements TransformEvaluator<Object> { private static final int ARBITRARY_MAX_ELEMENTS = 10; - private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform; + private final AppliedPTransform<?, PCollection<OutputT>, ?> transform; private final EvaluationContext evaluationContext; private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue; @@ -151,7 +152,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { private int outputBundles = 0; public UnboundedReadEvaluator( - AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, + AppliedPTransform<?, PCollection<OutputT>, ?> transform, EvaluationContext evaluationContext, UnboundedSource<OutputT, CheckpointMarkT> source, UnboundedReadDeduplicator deduplicator, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java deleted file mode 100644 index 57f1e59..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml index c7d9b2c..a3313ca 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml @@ -59,6 +59,9 @@ page at http://checkstyle.sourceforge.net/config.html --> <property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/> <property name="checkFormat" value="$1"/> </module> + <module name="SuppressionFilter"> + <property name="file" value="${checkstyle.suppressions.file}" /> + </module> <!-- Check that every module has a package-info.java --> <module name="JavadocPackage"/> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/build-tools/src/main/resources/beam/suppressions.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml index 6dbb7f5..00d6729 100644 --- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml @@ -20,5 +20,10 @@ <suppress checks="JavadocPackage" files=".*/src/test/.*"/> <suppress checks="JavadocPackage" files=".*/maven-archetypes/.*"/> <suppress checks="JavadocPackage" files=".*/examples/.*"/> + + <!-- suppress all checks in the generated directories --> + <suppress checks=".*" files=".+[\\\/]generated[\\\/].+\.java" /> + <suppress checks=".*" files=".+[\\\/]generated-sources[\\\/].+\.java" /> + <suppress checks=".*" files=".+[\\\/]generated-test-sources[\\\/].+\.java" /> </suppressions> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java index 8680552..da64a93 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java @@ -140,19 +140,19 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> elements.add(elementCoder.decode(dataInStream, nestedContext)); } return decodeToIterable(elements); - } else { - List<T> elements = new ArrayList<>(); - long count; - // We don't know the size a priori. Check if we're done with - // each block of elements. - while ((count = VarInt.decodeLong(dataInStream)) > 0) { - while (count > 0) { - elements.add(elementCoder.decode(dataInStream, nestedContext)); - count -= 1; - } + } + List<T> elements = new ArrayList<>(); + // We don't know the size a priori. Check if we're done with + // each block of elements. + long count = VarInt.decodeLong(dataInStream); + while (count > 0L) { + elements.add(elementCoder.decode(dataInStream, nestedContext)); + --count; + if (count == 0L) { + count = VarInt.decodeLong(dataInStream); } - return decodeToIterable(elements); } + return decodeToIterable(elements); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java index 79fb373..9bba42b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java @@ -364,6 +364,8 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> { return memoizedParser; } + static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {}; + /** * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by * {@link #coderProvider()}. @@ -372,7 +374,7 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> { new CoderProvider() { @Override public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException { - if (!type.isSubtypeOf(new TypeDescriptor<Message>() {})) { + if (!type.isSubtypeOf(CHECK)) { throw new CannotProvideCoderException( String.format( "Cannot provide %s because %s is not a subclass of %s", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java index 6ad63b0..066579b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java @@ -144,7 +144,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi checkState(stateAddressWindows != null, "Cannot ensure window %s is active since it is neither ACTIVE nor NEW", window); - if (stateAddressWindows.isEmpty()) { + if (stateAddressWindows != null && stateAddressWindows.isEmpty()) { // Window was NEW, make it ACTIVE with itself as its state address window. stateAddressWindows.add(window); } @@ -266,10 +266,12 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi checkState(otherStateAddressWindows != null, "Window %s is not ACTIVE or NEW", other); - for (W otherStateAddressWindow : otherStateAddressWindows) { - // Since otherTarget equiv other AND other equiv mergeResult - // THEN otherTarget equiv mergeResult. - newStateAddressWindows.add(otherStateAddressWindow); + if (otherStateAddressWindows != null) { + for (W otherStateAddressWindow : otherStateAddressWindows) { + // Since otherTarget equiv other AND other equiv mergeResult + // THEN otherTarget equiv mergeResult. + newStateAddressWindows.add(otherStateAddressWindow); + } } activeWindowToStateAddressWindows.remove(other); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java index fdcee16..bb6aa93 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java @@ -114,7 +114,7 @@ public abstract class PubsubClient implements Closeable { "Cannot interpret value of label %s as timestamp: %s", timestampLabel, value); } - return timestampMsSinceEpoch; + return timestampMsSinceEpoch == null ? 0 : timestampMsSinceEpoch; } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java index b6d7ccb..72fb9a2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java @@ -93,10 +93,10 @@ public class PubsubJsonClientTest { .setAckId(ACK_ID); PullResponse expectedResponse = new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage)); - Mockito.when(mockPubsub.projects() - .subscriptions() - .pull(expectedSubscription, expectedRequest) - .execute()) + Mockito.when((Object) (mockPubsub.projects() + .subscriptions() + .pull(expectedSubscription, expectedRequest) + .execute())) .thenReturn(expectedResponse); List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); assertEquals(1, acutalMessages.size()); @@ -120,10 +120,10 @@ public class PubsubJsonClientTest { .setMessages(ImmutableList.of(expectedPubsubMessage)); PublishResponse expectedResponse = new PublishResponse() .setMessageIds(ImmutableList.of(MESSAGE_ID)); - Mockito.when(mockPubsub.projects() - .topics() - .publish(expectedTopic, expectedRequest) - .execute()) + Mockito.when((Object) (mockPubsub.projects() + .topics() + .publish(expectedTopic, expectedRequest) + .execute())) .thenReturn(expectedResponse); OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java index 811051c..acff33f 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java @@ -183,7 +183,9 @@ public final class KinesisIO { @Override public AmazonKinesis get() { - return new AmazonKinesisClient(getCredentialsProvider()).withRegion(region); + AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider()); + client.withRegion(region); + return client; } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java index c98242b..b1c212b 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java @@ -22,7 +22,6 @@ import static com.google.common.collect.Lists.newArrayList; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.internal.StaticCredentialsProvider; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.PutRecordsRequest; import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; @@ -41,7 +40,7 @@ public class KinesisUploader { public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499; public static void uploadAll(List<String> data, KinesisTestOptions options) { - AmazonKinesis client = new AmazonKinesisClient( + AmazonKinesisClient client = new AmazonKinesisClient( new StaticCredentialsProvider( new BasicAWSCredentials( options.getAwsAccessKey(), options.getAwsSecretKey())) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java deleted file mode 100644 index 44dbf4a..0000000 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Transforms for reading and writing from Amazon Kinesis. - */ -package org.apache.beam.sdk.io.kinesis; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java deleted file mode 100644 index fd08b58..0000000 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Transforms for reading and writing from MongoDB. - */ -package org.apache.beam.sdk.io.mongodb;