[ 
https://issues.apache.org/jira/browse/BEAM-4348?focusedWorklogId=104789&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104789
 ]

ASF GitHub Bot logged work on BEAM-4348:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/May/18 21:18
            Start Date: 22/May/18 21:18
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #5448: [BEAM-4348] Enforce 
ErrorProne analysis in kinesis IO
URL: https://github.com/apache/beam/pull/5448
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/kinesis/build.gradle 
b/sdks/java/io/kinesis/build.gradle
index 05e49569185..4df04a26efa 100644
--- a/sdks/java/io/kinesis/build.gradle
+++ b/sdks/java/io/kinesis/build.gradle
@@ -17,7 +17,7 @@
  */
 
 apply from: project(":").file("build_rules.gradle")
-applyJavaNature()
+applyJavaNature(failOnWarning: true)
 provideIntegrationTestingDependencies()
 enableJavaPerformanceTesting()
 
@@ -33,6 +33,7 @@ def aws_version = "1.11.255"
 
 dependencies {
   compile library.java.guava
+  compileOnly library.java.findbugs_annotations
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow library.java.slf4j_api
   shadow library.java.joda_time
@@ -51,4 +52,5 @@ dependencies {
   testCompile library.java.hamcrest_core
   testCompile library.java.slf4j_jdk14
   testCompile "org.assertj:assertj-core:2.5.0"
+  testCompileOnly library.java.findbugs_annotations
 }
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
index 67611a935a0..578c3516c76 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
@@ -106,6 +106,8 @@ void start() throws TransientKinesisException {
     }
   }
 
+  // Note: readLoop() will log any Throwable raised so opt to ignore the 
future result
+  @SuppressWarnings("FutureReturnValueIgnored")
   void startReadingShards(Iterable<ShardRecordsIterator> 
shardRecordsIterators) {
     for (final ShardRecordsIterator recordsIterator : shardRecordsIterators) {
       numberOfRecordsInAQueueByShard.put(recordsIterator.getShardId(), new 
AtomicInteger());
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index f4906bdad86..7c5ca7da4db 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -80,8 +80,10 @@
 import com.amazonaws.services.kinesis.producer.IKinesisProducer;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import com.amazonaws.services.kinesis.waiters.AmazonKinesisWaiters;
+import com.google.common.base.Splitter;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -101,7 +103,7 @@
     private final String sequenceNumber;
 
     public TestData(KinesisRecord record) {
-      this(new String(record.getData().array()),
+      this(new String(record.getData().array(), StandardCharsets.UTF_8),
           record.getApproximateArrivalTimestamp(),
           record.getSequenceNumber());
     }
@@ -115,7 +117,7 @@ public TestData(String data, Instant arrivalTimestamp, 
String sequenceNumber) {
     public Record convertToRecord() {
       return new Record().
           withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
-          withData(ByteBuffer.wrap(data.getBytes())).
+          withData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8))).
           withSequenceNumber(sequenceNumber).
           withPartitionKey("");
     }
@@ -176,9 +178,10 @@ public AmazonKinesisMock(List<List<Record>> shardedData, 
int numberOfRecordsPerG
 
   @Override
   public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
-    String[] shardIteratorParts = 
getRecordsRequest.getShardIterator().split(":");
-    int shardId = parseInt(shardIteratorParts[0]);
-    int startingRecord = parseInt(shardIteratorParts[1]);
+    List<String> shardIteratorParts =
+        Splitter.on(':').splitToList(getRecordsRequest.getShardIterator());
+    int shardId = parseInt(shardIteratorParts.get(0));
+    int startingRecord = parseInt(shardIteratorParts.get(1));
     List<Record> shardData = shardedData.get(shardId);
 
     int toIndex = min(startingRecord + numberOfRecordsPerGet, 
shardData.size());
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
index c439393549c..ea5132912ca 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
@@ -22,6 +22,7 @@
 import com.amazonaws.regions.Regions;
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Random;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -106,7 +107,7 @@ public void processElement(ProcessContext c) {
   private List<byte[]> prepareData() {
     List<byte[]> data = newArrayList();
     for (int i = 0; i < NUM_RECORDS; i++) {
-      data.add(String.valueOf(i).getBytes());
+      data.add(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
     }
     return data;
   }
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java
index 3f3cc0a24c9..4d7c59733d9 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java
@@ -31,6 +31,7 @@
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Properties;
 import org.apache.beam.sdk.testing.PAssert;
@@ -131,7 +132,7 @@ public void 
testWriteValidationFailsMissingAWSClientsProvider() {
 
   @Test
   public void testNotExistedStream() {
-    Iterable<byte[]> data = ImmutableList.of("1".getBytes());
+    Iterable<byte[]> data = 
ImmutableList.of("1".getBytes(StandardCharsets.UTF_8));
     p.apply(Create.of(data))
         .apply(
             KinesisIO.write()
@@ -149,7 +150,7 @@ public void testSetInvalidProperty() {
     Properties properties = new Properties();
     properties.setProperty("KinesisPort", "qwe");
 
-    Iterable<byte[]> data = ImmutableList.of("1".getBytes());
+    Iterable<byte[]> data = 
ImmutableList.of("1".getBytes(StandardCharsets.UTF_8));
     p.apply(Create.of(data))
         .apply(
             KinesisIO.write()
@@ -171,7 +172,11 @@ public void testWrite() {
     properties.setProperty("KinesisPort", "4567");
     properties.setProperty("VerifyCertificate", "false");
 
-    Iterable<byte[]> data = ImmutableList.of("1".getBytes(), "2".getBytes(), 
"3".getBytes());
+    Iterable<byte[]> data =
+        ImmutableList.of(
+            "1".getBytes(StandardCharsets.UTF_8),
+            "2".getBytes(StandardCharsets.UTF_8),
+            "3".getBytes(StandardCharsets.UTF_8));
     p.apply(Create.of(data))
         .apply(
             KinesisIO.write()
@@ -186,7 +191,7 @@ public void testWrite() {
 
   @Test
   public void testWriteFailed() {
-    Iterable<byte[]> data = ImmutableList.of("1".getBytes());
+    Iterable<byte[]> data = 
ImmutableList.of("1".getBytes(StandardCharsets.UTF_8));
     p.apply(Create.of(data))
         .apply(
             KinesisIO.write()
@@ -203,7 +208,9 @@ public void testWriteFailed() {
   public void testWriteAndReadFromMockKinesis() {
     KinesisServiceMock kinesisService = KinesisServiceMock.getInstance();
 
-    Iterable<byte[]> data = ImmutableList.of("1".getBytes(), "2".getBytes());
+    Iterable<byte[]> data =
+        ImmutableList.of(
+            "1".getBytes(StandardCharsets.UTF_8), 
"2".getBytes(StandardCharsets.UTF_8));
     p.apply(Create.of(data))
         .apply(
             KinesisIO.write()
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
index ff6232aee2c..05ab6450eba 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -30,7 +31,7 @@
   @Test
   public void encodingAndDecodingWorks() throws Exception {
     KinesisRecord record = new KinesisRecord(
-        ByteBuffer.wrap("data".getBytes()),
+        ByteBuffer.wrap("data".getBytes(StandardCharsets.UTF_8)),
         "sequence",
         128L,
         "partition",
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisServiceMock.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisServiceMock.java
index 1ff0291d9b4..ae2c34a2aec 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisServiceMock.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisServiceMock.java
@@ -38,13 +38,9 @@
 
   private KinesisServiceMock() {}
 
-  public static KinesisServiceMock getInstance() {
+  public static synchronized KinesisServiceMock getInstance() {
     if (instance == null) {
-      synchronized (KinesisServiceMock.class) {
-        if (instance == null) {
-          instance = new KinesisServiceMock();
-        }
-      }
+      instance = new KinesisServiceMock();
     }
     return instance;
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 104789)
    Time Spent: 50m  (was: 40m)

> Enforce ErrorProne analysis in the kinesis IO project
> -----------------------------------------------------
>
>                 Key: BEAM-4348
>                 URL: https://issues.apache.org/jira/browse/BEAM-4348
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kinesis
>            Reporter: Scott Wegner
>            Assignee: Tim Robertson
>            Priority: Minor
>              Labels: errorprone, starter
>             Fix For: 2.5.0
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently 
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build 
> process, but only as warnings. ErrorProne errors are generally useful and 
> easy to fix. Some work was done to [make sdks-java-core 
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add 
> enforcement. This task is clean ErrorProne warnings and add enforcement in 
> {{beam-sdks-java-io-kinesis}}. Additional context discussed on the [dev 
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution 
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development 
> environment.
> # Run the following command to compile and run ErrorProne analysis on the 
> project: {{./gradlew :beam-sdks-java-io-kinesis:assemble}}
> # Fix each ErrorProne warning from the {{sdks/java/io/kinesis}} project.
> # In {{sdks/java/io/kinesis/build.gradle}}, add {{failOnWarning: true}} to 
> the call the {{applyJavaNature()}} 
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach 
> out|https://beam.apache.org/community/contact-us/] with questions or code 
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to