[ 
https://issues.apache.org/jira/browse/BEAM-4347?focusedWorklogId=105313&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105313
 ]

ASF GitHub Bot logged work on BEAM-4347:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/May/18 20:29
            Start Date: 23/May/18 20:29
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #5422: [BEAM-4347] Enforce 
ErrorProne analysis in kafka IO
URL: https://github.com/apache/beam/pull/5422
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index 77bf209157d..2a9f3495782 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -17,13 +17,14 @@
  */
 
 apply from: project(":").file("build_rules.gradle")
-applyJavaNature(enableFindbugs: false)
+applyJavaNature(failOnWarning: true, enableFindbugs: false)
 
 description = "Apache Beam :: SDKs :: Java :: IO :: Kafka"
 ext.summary = "Library to read Kafka topics."
 
 dependencies {
   compile library.java.guava
+  compileOnly library.java.findbugs_annotations
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow library.java.kafka_clients
   shadow library.java.slf4j_api
@@ -37,4 +38,5 @@ dependencies {
   testCompile library.java.hamcrest_core
   testCompile library.java.junit
   testCompile library.java.slf4j_jdk14
+  testCompileOnly library.java.findbugs_annotations
 }
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
index 9ae69da3834..b5d3526d4f1 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
@@ -40,6 +40,7 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -73,6 +74,7 @@
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -262,6 +264,8 @@ public void setup() {
       KafkaExactlyOnceSink.ensureEOSSupport();
     }
 
+    // Futures ignored as exceptions will be flushed out in the commitTxn
+    @SuppressWarnings("FutureReturnValueIgnored")
     @ProcessElement
     public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState,
                                @StateId(MIN_BUFFERED_ID) ValueState<Long> 
minBufferedIdState,
@@ -433,18 +437,20 @@ void beginTxn() {
         ProducerSpEL.beginTransaction(producer);
       }
 
-      void sendRecord(TimestampedValue<KV<K, V>> record, Counter sendCounter) {
+
+      Future<RecordMetadata> sendRecord(TimestampedValue<KV<K, V>> record, 
Counter sendCounter) {
         try {
           Long timestampMillis = spec.getPublishTimestampFunction() != null
             ? 
spec.getPublishTimestampFunction().getTimestamp(record.getValue(),
                                                               
record.getTimestamp()).getMillis()
             : null;
 
-          producer.send(
+          Future<RecordMetadata> result = producer.send(
               new ProducerRecord<>(
                   spec.getTopic(), null, timestampMillis,
                   record.getValue().getKey(), record.getValue().getValue()));
           sendCounter.inc();
+          return result;
         } catch (KafkaException e) {
           ProducerSpEL.abortTransaction(producer);
           throw e;
@@ -573,6 +579,8 @@ void commitTxn(long lastRecordId, Counter numTransactions) 
throws IOException {
 
       private final Cache<Integer, ShardWriter<K, V>> cache;
 
+      // Exceptions arising from the cache cleanup are ignored
+      @SuppressWarnings("FutureReturnValueIgnored")
       ShardWriterCache() {
         this.cache =
           CacheBuilder.newBuilder()
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 0859233aad1..611073e4d58 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1038,6 +1038,7 @@ public PDone expand(PCollection<KV<K, V>> input) {
       return PDone.in(input.getPipeline());
     }
 
+    @Override
     public void validate(PipelineOptions options) {
       if (isEOS()) {
         String runner = options.getRunner().getName();
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 682b3063781..1387cd8071e 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -23,12 +23,12 @@
 import com.google.common.collect.Iterators;
 import com.google.common.io.Closeables;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -75,6 +75,7 @@
 class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
 
   ///////////////////// Reader API 
////////////////////////////////////////////////////////////
+  @SuppressWarnings("FutureReturnValueIgnored")
   @Override
   public boolean start() throws IOException {
     final int defaultPartitionInitTimeout = 60 * 1000;
@@ -85,9 +86,13 @@ public boolean start() throws IOException {
     consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
 
     try {
-      keyDeserializerInstance = spec.getKeyDeserializer().newInstance();
-      valueDeserializerInstance = spec.getValueDeserializer().newInstance();
-    } catch (InstantiationException | IllegalAccessException e) {
+      keyDeserializerInstance = 
spec.getKeyDeserializer().getDeclaredConstructor().newInstance();
+      valueDeserializerInstance =
+          spec.getValueDeserializer().getDeclaredConstructor().newInstance();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | InvocationTargetException
+        | NoSuchMethodException e) {
       throw new IOException("Could not instantiate deserializers", e);
     }
 
@@ -614,7 +619,7 @@ private void nextBatch() {
     partitionStates.forEach(p -> p.recordIter = 
records.records(p.topicPartition).iterator());
 
     // cycle through the partitions in order to interleave records from each.
-    curBatch = Iterators.cycle(new LinkedList<>(partitionStates));
+    curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
   }
 
   private void setupInitialOffset(PartitionState pState) {
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
index 9f2544a96b0..501563a7a1b 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
@@ -49,7 +49,9 @@ public void setup() {
     }
   }
 
+  // Suppression since errors are tracked in SendCallback(), and checked in 
finishBundle()
   @ProcessElement
+  @SuppressWarnings("FutureReturnValueIgnored")
   public void processElement(ProcessContext ctx) throws Exception {
     checkForFailures();
 
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
index d1114c21f5f..7d1007b4bb7 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
@@ -108,7 +108,7 @@ public void testCustomTimestampPolicyWithLimitedDelay() {
     // (3) Verify that Watermark advances when there is no backlog
 
     // advance current time by 5 minutes
-    now = now.plus(Duration.standardSeconds(300));
+    now = now.plus(Duration.standardMinutes(5));
     Instant backlogCheckTime = now.minus(Duration.standardSeconds(10));
 
     when(ctx.getMessageBacklog()).thenReturn(0L);
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 48c673579a9..931771732c8 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -209,7 +209,7 @@
     final MockConsumer<byte[], byte[]> consumer =
         new MockConsumer<byte[], byte[]>(offsetResetStrategy) {
           @Override
-          public void assign(final Collection<TopicPartition> assigned) {
+          public synchronized void assign(final Collection<TopicPartition> 
assigned) {
             super.assign(assigned);
             assignedPartitions.set(ImmutableList.copyOf(assigned));
             for (TopicPartition tp : assigned) {
@@ -219,7 +219,7 @@ public void assign(final Collection<TopicPartition> 
assigned) {
           }
           // Override offsetsForTimes() in order to look up the offsets by 
timestamp.
           @Override
-          public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
+          public synchronized Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(
             Map<TopicPartition, Long> timestampsToSearch) {
             return timestampsToSearch
               .entrySet()
@@ -529,7 +529,7 @@ public void testUnboundedSourceCustomTimestamps() {
                   (tp, prevWatermark) -> new 
CustomTimestampPolicyWithLimitedDelay<Integer, Long>(
                     (record -> new 
Instant(TimeUnit.SECONDS.toMillis(record.getKV().getValue())
                                              + customTimestampStartMillis)),
-                   Duration.millis(0),
+                   Duration.ZERO,
                    prevWatermark))
                 .withoutMetadata())
         .apply(Values.create());
@@ -558,7 +558,7 @@ public void testUnboundedSourceCreateTimestamps() {
 
     PCollection<Long> input =
       p.apply(mkKafkaReadTransform(numElements, null)
-                .withCreateTime(Duration.millis(0))
+                .withCreateTime(Duration.ZERO)
                 .updateConsumerProperties(ImmutableMap.of(
                   TIMESTAMP_TYPE_CONFIG, "CreateTime",
                   TIMESTAMP_START_MILLIS_CONFIG, createTimestampStartMillis))
@@ -1345,7 +1345,7 @@ private static void 
verifyProducerRecords(MockProducer<Integer, Long> mockProduc
         // ProducerCompletionThread to inject errors.
 
         @Override
-        public void flush() {
+        public synchronized void flush() {
           while (completeNext()) {
             // there are some uncompleted records. let the completion thread 
handle them.
             try {
@@ -1361,6 +1361,7 @@ public void flush() {
       assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer));
     }
 
+    @Override
     public void close() {
       MOCK_PRODUCER_MAP.remove(producerKey);
       try {
@@ -1445,6 +1446,7 @@ public void close() {
       injectorThread = Executors.newSingleThreadExecutor();
     }
 
+    @SuppressWarnings("FutureReturnValueIgnored")
     ProducerSendCompletionThread start() {
       injectorThread.submit(
           () -> {
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
index a6344a62134..e19bb079f5a 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
@@ -22,6 +22,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -44,7 +45,7 @@ public void testCoderIsSerializableWithWellKnownCoderType() {
   @Test
   public void testKafkaRecordSerializableWithHeaders() throws IOException {
     RecordHeaders headers = new RecordHeaders();
-    headers.add("headerKey", "headerVal".getBytes());
+    headers.add("headerKey", "headerVal".getBytes(StandardCharsets.UTF_8));
     verifySerialization(headers);
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 105313)
    Time Spent: 5h 10m  (was: 5h)

> Enforce ErrorProne analysis in the kafka IO project
> ---------------------------------------------------
>
>                 Key: BEAM-4347
>                 URL: https://issues.apache.org/jira/browse/BEAM-4347
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Scott Wegner
>            Assignee: Tim Robertson
>            Priority: Minor
>              Labels: errorprone, starter
>          Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently 
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build 
> process, but only as warnings. ErrorProne errors are generally useful and 
> easy to fix. Some work was done to [make sdks-java-core 
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add 
> enforcement. This task is clean ErrorProne warnings and add enforcement in 
> {{beam-sdks-java-io-kafka}}. Additional context discussed on the [dev 
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution 
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development 
> environment.
> # Run the following command to compile and run ErrorProne analysis on the 
> project: {{./gradlew :beam-sdks-java-io-kafka:assemble}}
> # Fix each ErrorProne warning from the {{sdks/java/io/kafka}} project.
> # In {{sdks/java/io/kafka/build.gradle}}, add {{failOnWarning: true}} to the 
> call the {{applyJavaNature()}} 
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach 
> out|https://beam.apache.org/community/contact-us/] with questions or code 
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to