[
https://issues.apache.org/jira/browse/BEAM-4342?focusedWorklogId=105514&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105514
]
ASF GitHub Bot logged work on BEAM-4342:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/May/18 09:21
Start Date: 24/May/18 09:21
Worklog Time Spent: 10m
Work Description: iemejia closed pull request #5411: [BEAM-4342] Enforce
ErrorProne analysis in hadoop IO
URL: https://github.com/apache/beam/pull/5411
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/java/io/hadoop-common/build.gradle
b/sdks/java/io/hadoop-common/build.gradle
index 1ee3b7cbe6e..46454a3e838 100644
--- a/sdks/java/io/hadoop-common/build.gradle
+++ b/sdks/java/io/hadoop-common/build.gradle
@@ -17,12 +17,13 @@
*/
apply from: project(":").file("build_rules.gradle")
-applyJavaNature()
+applyJavaNature(failOnWarning: true)
description = "Apache Beam :: SDKs :: Java :: IO :: Hadoop Common"
ext.summary = "Library to add shared Hadoop classes among Beam IOs."
dependencies {
+ compileOnly library.java.findbugs_annotations
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow library.java.findbugs_jsr305
provided library.java.hadoop_client
@@ -31,4 +32,5 @@ dependencies {
testCompile library.java.commons_lang3
testCompile library.java.hamcrest_core
testCompile library.java.junit
+ testCompileOnly library.java.findbugs_annotations
}
diff --git a/sdks/java/io/hadoop-file-system/build.gradle
b/sdks/java/io/hadoop-file-system/build.gradle
index ff478380b58..493909edcda 100644
--- a/sdks/java/io/hadoop-file-system/build.gradle
+++ b/sdks/java/io/hadoop-file-system/build.gradle
@@ -17,13 +17,14 @@
*/
apply from: project(":").file("build_rules.gradle")
-applyJavaNature()
+applyJavaNature(failOnWarning: true)
description = "Apache Beam :: SDKs :: Java :: IO :: Hadoop File System"
ext.summary = "Library to read and write Hadoop/HDFS file formats from Beam."
dependencies {
compile library.java.guava
+ compileOnly library.java.findbugs_annotations
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow library.java.jackson_core
shadow library.java.jackson_databind
@@ -40,4 +41,5 @@ dependencies {
testCompile library.java.junit
testCompile library.java.hadoop_minicluster
testCompile library.java.hadoop_hdfs_tests
+ testCompileOnly library.java.findbugs_annotations
}
diff --git a/sdks/java/io/hadoop-input-format/build.gradle
b/sdks/java/io/hadoop-input-format/build.gradle
index e9243f5badb..468636a0a95 100644
--- a/sdks/java/io/hadoop-input-format/build.gradle
+++ b/sdks/java/io/hadoop-input-format/build.gradle
@@ -17,7 +17,7 @@
*/
apply from: project(":").file("build_rules.gradle")
-applyJavaNature()
+applyJavaNature(failOnWarning: true)
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()
@@ -41,6 +41,7 @@ configurations.testRuntimeClasspath {
dependencies {
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
compile library.java.guava
+ compileOnly library.java.findbugs_annotations
shadow library.java.slf4j_api
shadow library.java.findbugs_jsr305
shadow project(path: ":beam-sdks-java-io-hadoop-common", configuration:
"shadow")
@@ -74,6 +75,7 @@ dependencies {
testCompile "org.apache.logging.log4j:log4j-core:$log4j_version"
testCompile library.java.junit
shadow library.java.commons_io_2x
+ testCompileOnly library.java.findbugs_annotations
}
// The cassandra.yaml file currently assumes "target/..." exists.
diff --git
a/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
b/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 0ffd402320d..e29ad99573d 100644
---
a/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++
b/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -24,6 +24,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
@@ -234,6 +235,7 @@
}
/** Reads from the source using the options provided by the given
configuration. */
+ @SuppressWarnings("unchecked")
public Read<K, V> withConfiguration(Configuration configuration) {
validateConfiguration(configuration);
TypeDescriptor<?> inputFormatClass =
@@ -352,6 +354,7 @@ private void validateTranslationFunction(TypeDescriptor<?>
inputType,
* coder, if not found in Coder Registry, then check if the type
descriptor provided is of type
* Writable, then WritableCoder is returned, else exception is thrown
"Cannot find coder".
*/
+ @SuppressWarnings({"unchecked", "WeakerAccess"})
public <T> Coder<T> getDefaultCoder(TypeDescriptor<?> typeDesc,
CoderRegistry coderRegistry) {
Class classType = typeDesc.getRawType();
try {
@@ -411,6 +414,7 @@ private void validateTranslationFunction(TypeDescriptor<?>
inputType,
null);
}
+ @SuppressWarnings("WeakerAccess")
protected HadoopInputFormatBoundedSource(
SerializableConfiguration conf,
Coder<K> keyCoder,
@@ -426,6 +430,7 @@ protected HadoopInputFormatBoundedSource(
this.valueTranslationFunction = valueTranslationFunction;
}
+ @SuppressWarnings("WeakerAccess")
public SerializableConfiguration getConfiguration() {
return conf;
}
@@ -472,15 +477,13 @@ public void populateDisplayData(DisplayData.Builder
builder) {
.stream()
.map(
serializableInputSplit -> {
- HadoopInputFormatBoundedSource<K, V> hifBoundedSource =
- new HadoopInputFormatBoundedSource<>(
- conf,
- keyCoder,
- valueCoder,
- keyTranslationFunction,
- valueTranslationFunction,
- serializableInputSplit);
- return hifBoundedSource;
+ return new HadoopInputFormatBoundedSource<>(
+ conf,
+ keyCoder,
+ valueCoder,
+ keyTranslationFunction,
+ valueTranslationFunction,
+ serializableInputSplit);
})
.collect(Collectors.toList());
}
@@ -531,6 +534,7 @@ void computeSplitsIfNecessary() throws IOException,
InterruptedException {
* Creates instance of InputFormat class. The InputFormat class name is
specified in the Hadoop
* configuration.
*/
+ @SuppressWarnings("WeakerAccess")
protected void createInputFormatInstance() throws IOException {
if (inputFormatObj == null) {
try {
@@ -541,6 +545,7 @@ protected void createInputFormatInstance() throws
IOException {
.get()
.getClassByName(
conf.get().get("mapreduce.job.inputformat.class"))
+ .getConstructor()
.newInstance();
/*
* If InputFormat explicitly implements interface {@link
Configurable}, then setConf()
@@ -552,7 +557,11 @@ protected void createInputFormatInstance() throws
IOException {
if (Configurable.class.isAssignableFrom(inputFormatObj.getClass())) {
((Configurable) inputFormatObj).setConf(conf.get());
}
- } catch (InstantiationException | IllegalAccessException |
ClassNotFoundException e) {
+ } catch (InstantiationException
+ | IllegalAccessException
+ | ClassNotFoundException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
throw new IOException("Unable to create InputFormat object: ", e);
}
}
@@ -604,13 +613,15 @@ void setInputFormatObj(InputFormat<?, ?> inputFormatObj) {
private final SerializableSplit split;
private RecordReader<T1, T2> recordReader;
private volatile boolean doneReading = false;
- private AtomicLong recordsReturned = new AtomicLong();
+ private final AtomicLong recordsReturned = new AtomicLong();
// Tracks the progress of the RecordReader.
- private AtomicDouble progressValue = new AtomicDouble();
- private transient InputFormat<T1, T2> inputFormatObj;
- private transient TaskAttemptContext taskAttemptContext;
+ private final AtomicDouble progressValue = new AtomicDouble();
+ private final transient InputFormat<T1, T2> inputFormatObj;
+ private final transient TaskAttemptContext taskAttemptContext;
- private HadoopInputFormatReader(HadoopInputFormatBoundedSource<K, V>
source,
+ @SuppressWarnings("unchecked")
+ private HadoopInputFormatReader(
+ HadoopInputFormatBoundedSource<K, V> source,
@Nullable SimpleFunction keyTranslationFunction,
@Nullable SimpleFunction valueTranslationFunction,
SerializableSplit split,
@@ -696,12 +707,14 @@ public boolean advance() throws IOException {
/**
* Returns the serialized output of transformed key or value object.
+ *
* @throws ClassCastException
* @throws CoderException
*/
- private <T, T3> T3 transformKeyOrValue(T input,
- @Nullable SimpleFunction<T, T3> simpleFunction, Coder<T3> coder)
throws CoderException,
- ClassCastException {
+ @SuppressWarnings("unchecked")
+ private <T, T3> T3 transformKeyOrValue(
+ T input, @Nullable SimpleFunction<T, T3> simpleFunction, Coder<T3>
coder)
+ throws CoderException, ClassCastException {
T3 output;
if (null != simpleFunction) {
output = simpleFunction.apply(input);
@@ -777,9 +790,9 @@ public final long getSplitPointsRemaining() {
if (doneReading) {
return 0;
}
- /**
- * This source does not currently support dynamic work rebalancing, so
remaining parallelism
- * is always 1.
+ /*
+ This source does not currently support dynamic work rebalancing, so
remaining parallelism
+ is always 1.
*/
return 1;
}
@@ -802,11 +815,12 @@ public SerializableSplit(InputSplit split) {
this.inputSplit = split;
}
+ @SuppressWarnings("WeakerAccess")
public InputSplit getSplit() {
return inputSplit;
}
- private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ private void readObject(ObjectInputStream in) throws IOException {
ObjectWritable ow = new ObjectWritable();
ow.setConf(new Configuration(false));
ow.readFields(in);
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ConfigurableEmployeeInputFormat.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ConfigurableEmployeeInputFormat.java
index 01eecd194aa..53e2a3e4e24 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ConfigurableEmployeeInputFormat.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ConfigurableEmployeeInputFormat.java
@@ -34,7 +34,7 @@
* Configurable. This validates if setConf() method is called before
getSplits(). Known InputFormats
* which implement Configurable are DBInputFormat, TableInputFormat etc.
*/
-public class ConfigurableEmployeeInputFormat extends InputFormat<Text,
Employee> implements
+class ConfigurableEmployeeInputFormat extends InputFormat<Text, Employee>
implements
Configurable {
public boolean isConfSet = false;
@@ -55,7 +55,7 @@ public void setConf(Configuration conf) {
@Override
public RecordReader<Text, Employee> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
+ TaskAttemptContext context) {
return new ConfigurableEmployeeRecordReader();
}
@@ -64,7 +64,7 @@ public void setConf(Configuration conf) {
* {@link #setConf()} is not called.
*/
@Override
- public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
+ public List<InputSplit> getSplits(JobContext context) throws IOException {
if (!isConfSet) {
throw new IOException("Configuration is not set.");
}
@@ -76,21 +76,21 @@ public void setConf(Configuration conf) {
/**
* InputSplit implementation for ConfigurableEmployeeInputFormat.
*/
- public class ConfigurableEmployeeInputSplit extends InputSplit implements
Writable {
+ static class ConfigurableEmployeeInputSplit extends InputSplit implements
Writable {
@Override
- public void readFields(DataInput arg0) throws IOException {}
+ public void readFields(DataInput arg0) {}
@Override
- public void write(DataOutput arg0) throws IOException {}
+ public void write(DataOutput arg0) {}
@Override
- public long getLength() throws IOException, InterruptedException {
+ public long getLength() {
return 0;
}
@Override
- public String[] getLocations() throws IOException, InterruptedException {
+ public String[] getLocations() {
return null;
}
}
@@ -98,33 +98,33 @@ public long getLength() throws IOException,
InterruptedException {
/**
* RecordReader for ConfigurableEmployeeInputFormat.
*/
- public class ConfigurableEmployeeRecordReader extends RecordReader<Text,
Employee> {
+ static class ConfigurableEmployeeRecordReader extends RecordReader<Text,
Employee> {
@Override
- public void initialize(InputSplit paramInputSplit, TaskAttemptContext
paramTaskAttemptContext)
- throws IOException, InterruptedException {}
+ public void initialize(
+ InputSplit paramInputSplit, TaskAttemptContext
paramTaskAttemptContext) {}
@Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
+ public boolean nextKeyValue() {
return false;
}
@Override
- public Text getCurrentKey() throws IOException, InterruptedException {
+ public Text getCurrentKey() {
return null;
}
@Override
- public Employee getCurrentValue() throws IOException, InterruptedException
{
+ public Employee getCurrentValue() {
return null;
}
@Override
- public float getProgress() throws IOException, InterruptedException {
+ public float getProgress() {
return 0;
}
@Override
- public void close() throws IOException {}
+ public void close() {}
}
}
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/EmployeeInputFormat.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/EmployeeInputFormat.java
index fbf4bc11048..4d75ec29666 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/EmployeeInputFormat.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/EmployeeInputFormat.java
@@ -14,6 +14,7 @@
*/
package org.apache.beam.sdk.io.hadoop.inputformat;
+import com.google.common.base.Splitter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -37,18 +38,18 @@
* {@linkplain HadoopInputFormatIO } source returns immutable records in the
scenario when
* RecordReader creates new key and value objects every time it reads data.
*/
-public class EmployeeInputFormat extends InputFormat<Text, Employee> {
+class EmployeeInputFormat extends InputFormat<Text, Employee> {
public EmployeeInputFormat() {}
@Override
public RecordReader<Text, Employee> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
+ TaskAttemptContext context) {
return new EmployeeRecordReader();
}
@Override
- public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException {
+ public List<InputSplit> getSplits(JobContext arg0) {
List<InputSplit> inputSplitList = new ArrayList<>();
for (int i = 1; i <= TestEmployeeDataSet.NUMBER_OF_SPLITS; i++) {
InputSplit inputSplitObj =
@@ -79,16 +80,16 @@ public NewObjectsEmployeeInputSplit(long startIndex, long
endIndex) {
* Returns number of records in each split.
*/
@Override
- public long getLength() throws IOException, InterruptedException {
+ public long getLength() {
return this.endIndex - this.startIndex + 1;
}
@Override
- public String[] getLocations() throws IOException, InterruptedException {
+ public String[] getLocations() {
return null;
}
- public long getStartIndex() {
+ long getStartIndex() {
return startIndex;
}
@@ -112,7 +113,7 @@ public void write(DataOutput dataOut) throws IOException {
/**
* RecordReader for EmployeeInputFormat.
*/
- public class EmployeeRecordReader extends RecordReader<Text, Employee> {
+ public static class EmployeeRecordReader extends RecordReader<Text,
Employee> {
private NewObjectsEmployeeInputSplit split;
private Text currentKey;
@@ -124,26 +125,25 @@ public void write(DataOutput dataOut) throws IOException {
public EmployeeRecordReader() {}
@Override
- public void close() throws IOException {}
+ public void close() {}
@Override
- public Text getCurrentKey() throws IOException, InterruptedException {
+ public Text getCurrentKey() {
return currentKey;
}
@Override
- public Employee getCurrentValue() throws IOException, InterruptedException
{
+ public Employee getCurrentValue() {
return currentValue;
}
@Override
- public float getProgress() throws IOException, InterruptedException {
+ public float getProgress() {
return (float) recordsRead / split.getLength();
}
@Override
- public void initialize(InputSplit split, TaskAttemptContext arg1) throws
IOException,
- InterruptedException {
+ public void initialize(InputSplit split, TaskAttemptContext arg1) {
this.split = (NewObjectsEmployeeInputSplit) split;
employeeListIndex = this.split.getStartIndex() - 1;
recordsRead = 0;
@@ -152,19 +152,20 @@ public void initialize(InputSplit split,
TaskAttemptContext arg1) throws IOExcep
}
@Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
+ public boolean nextKeyValue() {
if ((recordsRead++) >= split.getLength()) {
return false;
}
employeeListIndex++;
KV<String, String> employeeDetails = employeeDataList.get((int)
employeeListIndex);
- String empData[] = employeeDetails.getValue().split("_");
+
+ List<String> empData =
Splitter.on('_').splitToList(employeeDetails.getValue());
/*
* New objects must be returned every time for key and value in order to
test the scenario as
* discussed the in the class' javadoc.
*/
currentKey = new Text(employeeDetails.getKey());
- currentValue = new Employee(empData[0], empData[1]);
+ currentValue = new Employee(empData.get(0), empData.get(1));
return true;
}
}
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOCassandraIT.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOCassandraIT.java
index 76537cfa8bd..ce0ba568915 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOCassandraIT.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOCassandraIT.java
@@ -109,7 +109,7 @@ public void testHIFReadForCassandra() {
pipeline.run().waitUntilFinish();
}
- SimpleFunction<Row, String> myValueTranslate = new SimpleFunction<Row,
String>() {
+ private final SimpleFunction<Row, String> myValueTranslate = new
SimpleFunction<Row, String>() {
@Override
public String apply(Row input) {
return input.getString("y_id") + "|" + input.getString("field0") + "|"
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOElasticIT.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOElasticIT.java
index 59595513db7..10f1314ccc7 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOElasticIT.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOElasticIT.java
@@ -14,7 +14,6 @@
*/
package org.apache.beam.sdk.io.hadoop.inputformat;
-import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -87,7 +86,7 @@ public static void setUp() {
* successfully.
*/
@Test
- public void testHifIOWithElastic() throws SecurityException, IOException {
+ public void testHifIOWithElastic() throws SecurityException {
// Expected hashcode is evaluated during insertion time one time and
hardcoded here.
final long expectedRowCount = 1000L;
String expectedHashCode = "42e254c8689050ed0a617ff5e80ea392";
@@ -106,7 +105,7 @@ public void testHifIOWithElastic() throws
SecurityException, IOException {
pipeline.run().waitUntilFinish();
}
- MapElements<LinkedMapWritable, String> transformFunc =
+ private final MapElements<LinkedMapWritable, String> transformFunc =
MapElements.via(
new SimpleFunction<LinkedMapWritable, String>() {
@Override
@@ -145,7 +144,7 @@ private String convertMapWRowToString(LinkedMapWritable
mapw) {
* separator.
*/
private String addFieldValuesToRow(String row, MapWritable mapw, String
columnName) {
- Object valueObj = (Object) mapw.get(new Text(columnName));
+ Object valueObj = mapw.get(new Text(columnName));
row += valueObj.toString() + "|";
return row;
}
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
index d38cb406dfb..ef4d7775ba0 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
@@ -17,12 +17,11 @@
*/
package org.apache.beam.sdk.io.hadoop.inputformat;
+import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.common.HashingFn;
@@ -94,7 +93,7 @@
@BeforeClass
public static void startServer()
- throws NodeValidationException, InterruptedException, IOException {
+ throws NodeValidationException, IOException {
ServerSocket serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort();
serverSocket.close();
@@ -125,7 +124,7 @@ public void testHifIOWithElastic() {
pipeline.run().waitUntilFinish();
}
- MapElements<LinkedMapWritable, String> transformFunc =
+ private final MapElements<LinkedMapWritable, String> transformFunc =
MapElements.via(
new SimpleFunction<LinkedMapWritable, String>() {
@Override
@@ -176,7 +175,7 @@ public void testHifIOWithElasticQuery() {
* <a
href="https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html"
* >Elasticsearch Configuration</a> for more details.
*/
- public Configuration getConfiguration() {
+ private Configuration getConfiguration() {
Configuration conf = new Configuration();
conf.set(ConfigurationOptions.ES_NODES, ELASTIC_IN_MEM_HOSTNAME);
conf.set(ConfigurationOptions.ES_PORT, String.format("%s",
elasticInMemPort));
@@ -210,8 +209,8 @@ public static void shutdownServer() throws IOException {
private static final long serialVersionUID = 1L;
private static Node node;
- public static void startElasticEmbeddedServer()
- throws NodeValidationException, InterruptedException {
+ static void startElasticEmbeddedServer()
+ throws NodeValidationException {
Settings settings = Settings.builder()
.put("node.data", TRUE)
.put("network.host", ELASTIC_IN_MEM_HOSTNAME)
@@ -232,7 +231,7 @@ public static void startElasticEmbeddedServer()
/**
* Prepares Elastic index, by adding rows.
*/
- private static void prepareElasticIndex() throws InterruptedException {
+ private static void prepareElasticIndex() {
CreateIndexRequest indexRequest = new
CreateIndexRequest(ELASTIC_INDEX_NAME);
node.client().admin().indices().create(indexRequest).actionGet();
for (int i = 0; i < TEST_DATA_ROW_COUNT; i++) {
@@ -246,7 +245,7 @@ private static void prepareElasticIndex() throws
InterruptedException {
* Shutdown the embedded instance.
* @throws IOException
*/
- public static void shutdown() throws IOException {
+ static void shutdown() throws IOException {
DeleteIndexRequest indexRequest = new
DeleteIndexRequest(ELASTIC_INDEX_NAME);
node.client().admin().indices().delete(indexRequest).actionGet();
LOG.info("Deleted index " + ELASTIC_INDEX_NAME + " from elastic in
memory server");
@@ -270,14 +269,11 @@ private static void deleteElasticDataDirectory() {
static class PluginNode extends Node implements Serializable {
private static final long serialVersionUID = 1L;
- static Collection<Class<? extends Plugin>> list = new ArrayList<>();
+ private static final ImmutableList<Class<? extends Plugin>> PLUGINS =
+ ImmutableList.of(Netty4Plugin.class);
- static {
- list.add(Netty4Plugin.class);
- }
-
- public PluginNode(final Settings settings) {
- super(InternalSettingsPreparer.prepareEnvironment(settings, null), list);
+ PluginNode(final Settings settings) {
+ super(InternalSettingsPreparer.prepareEnvironment(settings, null),
PLUGINS);
}
}
}
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
index 8f1ad767738..1c5d3eb1767 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
@@ -63,7 +63,7 @@
private static transient Cluster cluster;
private static transient Session session;
private static final long TEST_DATA_ROW_COUNT = 10L;
- private static EmbeddedCassandraService cassandra = new
EmbeddedCassandraService();
+ private static final EmbeddedCassandraService cassandra = new
EmbeddedCassandraService();
@Rule
public final transient TestPipeline p = TestPipeline.create();
@@ -74,7 +74,7 @@
* @throws Exception
*/
@Test
- public void testHIFReadForCassandra() throws Exception {
+ public void testHIFReadForCassandra() {
// Expected hashcode is evaluated during insertion time one time and
hardcoded here.
String expectedHashCode = "1b9780833cce000138b9afa25ba63486";
Configuration conf = getConfiguration();
@@ -92,20 +92,20 @@ public void testHIFReadForCassandra() throws Exception {
p.run().waitUntilFinish();
}
- SimpleFunction<Row, String> myValueTranslate = new SimpleFunction<Row,
String>() {
- @Override
- public String apply(Row input) {
- String scientistRecord = input.getInt("id") + "|" +
input.getString("scientist");
- return scientistRecord;
- }
- };
+ private final SimpleFunction<Row, String> myValueTranslate =
+ new SimpleFunction<Row, String>() {
+ @Override
+ public String apply(Row input) {
+ return input.getInt("id") + "|" + input.getString("scientist");
+ }
+ };
/**
* Test to read data from embedded Cassandra instance based on query and
verify whether data is
* read successfully.
*/
@Test
- public void testHIFReadForCassandraQuery() throws Exception {
+ public void testHIFReadForCassandraQuery() {
Long expectedCount = 1L;
String expectedChecksum = "f11caabc7a9fc170e22b41218749166c";
Configuration conf = getConfiguration();
@@ -129,7 +129,7 @@ public void testHIFReadForCassandraQuery() throws Exception
{
* class name, key class, value class are thrift port, thrift address,
partitioner class, keyspace
* and columnfamily name
*/
- public Configuration getConfiguration() {
+ private Configuration getConfiguration() {
Configuration conf = new Configuration();
conf.set(CASSANDRA_THRIFT_PORT_PROPERTY, CASSANDRA_PORT);
conf.set(CASSANDRA_THRIFT_ADDRESS_PROPERTY, CASSANDRA_HOST);
@@ -143,7 +143,7 @@ public Configuration getConfiguration() {
return conf;
}
- public static void createCassandraData() throws Exception {
+ private static void createCassandraData() {
session.execute("DROP KEYSPACE IF EXISTS " + CASSANDRA_KEYSPACE);
session.execute("CREATE KEYSPACE " + CASSANDRA_KEYSPACE
+ " WITH REPLICATION = {'class':'SimpleStrategy',
'replication_factor':1};");
@@ -173,7 +173,7 @@ public static void startCassandra() throws Exception {
}
@AfterClass
- public static void stopEmbeddedCassandra() throws Exception {
+ public static void stopEmbeddedCassandra() {
session.close();
cluster.close();
}
@@ -205,6 +205,7 @@ public void setId(int id) {
this.id = id;
}
+ @Override
public String toString() {
return id + ":" + name;
}
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java
index 06a7e045013..96a08a6731f 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java
@@ -24,7 +24,7 @@
/**
* Properties needed when using HadoopInputFormatIO with the Beam SDK.
*/
-public interface HIFITestOptions extends TestPipelineOptions {
+interface HIFITestOptions extends TestPipelineOptions {
//Cassandra test options
@Description("Cassandra Server IP")
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index c472442b09c..be17286ddd7 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -63,9 +63,9 @@
*/
@RunWith(JUnit4.class)
public class HadoopInputFormatIOTest {
- static SerializableConfiguration serConf;
- static SimpleFunction<Text, String> myKeyTranslate;
- static SimpleFunction<Employee, String> myValueTranslate;
+ private static SerializableConfiguration serConf;
+ private static SimpleFunction<Text, String> myKeyTranslate;
+ private static SimpleFunction<Employee, String> myValueTranslate;
@Rule public final transient TestPipeline p = TestPipeline.create();
@Rule public ExpectedException thrown = ExpectedException.none();
@@ -73,7 +73,7 @@
private PBegin input = PBegin.in(p);
@BeforeClass
- public static void setUp() throws IOException, InterruptedException {
+ public static void setUp() {
serConf = loadTestConfiguration(
EmployeeInputFormat.class,
Text.class,
@@ -135,8 +135,7 @@ public void testReadBuildsCorrectlyInDifferentOrder() {
* @throws IOException
*/
@Test
- public void
testReadBuildsCorrectlyIfWithConfigurationIsCalledMoreThanOneTime()
- throws IOException, InterruptedException {
+ public void
testReadBuildsCorrectlyIfWithConfigurationIsCalledMoreThanOneTime() {
SerializableConfiguration diffConf =
loadTestConfiguration(
EmployeeInputFormat.class,
@@ -386,7 +385,7 @@ public String apply(LongWritable input) {
}
@Test
- public void testReadingData() throws Exception {
+ public void testReadingData() {
HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text,
Employee>read()
.withConfiguration(serConf.get());
List<KV<Text, Employee>> expected = TestEmployeeDataSet.getEmployeeData();
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java
index f1a48c29542..ad16a605a6a 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java
@@ -14,6 +14,7 @@
*/
package org.apache.beam.sdk.io.hadoop.inputformat;
+import com.google.common.base.Splitter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -46,18 +47,18 @@
* RecordReader returns the same key and value objects with updating values
every time it reads
* data.
*/
-public class ReuseObjectsEmployeeInputFormat extends InputFormat<Text,
Employee> {
+class ReuseObjectsEmployeeInputFormat extends InputFormat<Text, Employee> {
public ReuseObjectsEmployeeInputFormat() {}
@Override
public RecordReader<Text, Employee> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
+ TaskAttemptContext context) {
return new ReuseObjectsEmployeeRecordReader();
}
@Override
- public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException {
+ public List<InputSplit> getSplits(JobContext arg0) {
List<InputSplit> inputSplitList = new ArrayList<>();
for (int i = 1; i <= TestEmployeeDataSet.NUMBER_OF_SPLITS; i++) {
InputSplit inputSplitObj = new ReuseEmployeeInputSplit(
@@ -71,31 +72,31 @@ public ReuseObjectsEmployeeInputFormat() {}
/**
* InputSplit implementation for ReuseObjectsEmployeeInputFormat.
*/
- public class ReuseEmployeeInputSplit extends InputSplit implements Writable {
+ static class ReuseEmployeeInputSplit extends InputSplit implements Writable {
// Start and end map index of each split of employeeData.
private long startIndex;
private long endIndex;
public ReuseEmployeeInputSplit() {}
- public ReuseEmployeeInputSplit(long startIndex, long endIndex) {
+ ReuseEmployeeInputSplit(long startIndex, long endIndex) {
this.startIndex = startIndex;
this.endIndex = endIndex;
}
/** Returns number of records in each split. */
@Override
- public long getLength() throws IOException, InterruptedException {
+ public long getLength() {
return this.endIndex - this.startIndex + 1;
}
@Override
- public String[] getLocations() throws IOException, InterruptedException {
+ public String[] getLocations() {
return null;
}
- public long getStartIndex() {
+ long getStartIndex() {
return startIndex;
}
@@ -119,38 +120,37 @@ public void write(DataOutput dataOut) throws IOException {
/**
* RecordReader for ReuseObjectsEmployeeInputFormat.
*/
- public class ReuseObjectsEmployeeRecordReader extends RecordReader<Text,
Employee> {
+ static class ReuseObjectsEmployeeRecordReader extends RecordReader<Text,
Employee> {
private ReuseEmployeeInputSplit split;
- private Text currentKey = new Text();
- private Employee currentValue = new Employee();
+ private final Text currentKey = new Text();
+ private final Employee currentValue = new Employee();
private long employeeListIndex = 0L;
private long recordsRead = 0L;
private List<KV<String, String>> employeeDataList;
- public ReuseObjectsEmployeeRecordReader() {}
+ ReuseObjectsEmployeeRecordReader() {}
@Override
- public void close() throws IOException {}
+ public void close() {}
@Override
- public Text getCurrentKey() throws IOException, InterruptedException {
+ public Text getCurrentKey() {
return currentKey;
}
@Override
- public Employee getCurrentValue() throws IOException, InterruptedException
{
+ public Employee getCurrentValue() {
return currentValue;
}
@Override
- public float getProgress() throws IOException, InterruptedException {
+ public float getProgress() {
return (float) recordsRead / split.getLength();
}
@Override
- public void initialize(InputSplit split, TaskAttemptContext arg1)
- throws IOException, InterruptedException {
+ public void initialize(InputSplit split, TaskAttemptContext arg1) {
this.split = (ReuseEmployeeInputSplit) split;
employeeListIndex = this.split.getStartIndex() - 1;
recordsRead = 0;
@@ -158,17 +158,17 @@ public void initialize(InputSplit split,
TaskAttemptContext arg1)
}
@Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
+ public boolean nextKeyValue() {
if ((recordsRead++) >= split.getLength()) {
return false;
}
employeeListIndex++;
KV<String, String> employeeDetails = employeeDataList.get((int)
employeeListIndex);
- String empData[] = employeeDetails.getValue().split("_");
+ List<String> empData =
Splitter.on('_').splitToList(employeeDetails.getValue());
// Updating the same key and value objects with new employee data.
currentKey.set(employeeDetails.getKey());
- currentValue.setEmpName(empData[0]);
- currentValue.setEmpAddress(empData[1]);
+ currentValue.setEmpName(empData.get(0));
+ currentValue.setEmpAddress(empData.get(1));
return true;
}
}
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
index 2d1c0880466..eafef0f9db3 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
@@ -14,6 +14,7 @@
*/
package org.apache.beam.sdk.io.hadoop.inputformat;
+import com.google.common.base.Splitter;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -24,7 +25,7 @@
* Test Utils used in {@link EmployeeInputFormat} and {@link
ReuseObjectsEmployeeInputFormat} for
* computing splits.
*/
-public class TestEmployeeDataSet {
+class TestEmployeeDataSet {
public static final long NUMBER_OF_RECORDS_IN_EACH_SPLIT = 5L;
public static final long NUMBER_OF_SPLITS = 3L;
private static final List<KV<String, String>> data = new ArrayList<>();
@@ -66,8 +67,8 @@
.stream()
.map(
input -> {
- String[] empData = input.getValue().split("_");
- return KV.of(new Text(input.getKey()), new Employee(empData[0],
empData[1]));
+ List<String> empData =
Splitter.on('_').splitToList(input.getValue());
+ return KV.of(new Text(input.getKey()), new
Employee(empData.get(0), empData.get(1)));
})
.collect(Collectors.toList());
}
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
index c9b2639b192..28769ca7acf 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
@@ -35,15 +35,17 @@
* {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat}.
*/
@DefaultCoder(AvroCoder.class)
-public class TestRowDBWritable extends TestRow implements DBWritable, Writable
{
+class TestRowDBWritable extends TestRow implements DBWritable, Writable {
private Integer id;
private String name;
- @Override public Integer id() {
+ @Override
+ public Integer id() {
return id;
}
+ @Override
public String name() {
return name;
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 105514)
Time Spent: 1h 10m (was: 1h)
> Enforce ErrorProne analysis in the hadoop IO projects
> -----------------------------------------------------
>
> Key: BEAM-4342
> URL: https://issues.apache.org/jira/browse/BEAM-4342
> Project: Beam
> Issue Type: Improvement
> Components: io-java-hadoop
> Reporter: Scott Wegner
> Assignee: Tim Robertson
> Priority: Minor
> Labels: errorprone, starter
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Java ErrorProne static analysis was [recently
> enabled|https://github.com/apache/beam/pull/5161] in the Gradle build
> process, but only as warnings. ErrorProne errors are generally useful and
> easy to fix. Some work was done to [make sdks-java-core
> ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add
> enforcement. This task is clean ErrorProne warnings and add enforcement in
> {{beam-sdks-java-io-hadoop-*}}. Additional context discussed on the [dev
> list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E].
> Fixing this issue will involve:
> # Follow instructions in the [Contribution
> Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development
> environment.
> # Run the following command to compile and run ErrorProne analysis on the
> project: {{./gradlew :beam-sdks-java-io-hadoop-common:assemble
> :beam-sdks-java-io-hadoop-file-system:assemble
> :beam-sdks-java-io-hadoop-input-format:assemble}}
> # Fix each ErrorProne warning from the {{sdks/java/io/hadoop*}} projects.
> # In {{sdks/java/io/hadoop-common/build.gradle}},
> {{sdks/java/io/hadoop-file-system/build.gradle}}, and
> {{sdks/java/io/hadoop-input-format/build.gradle}}, add {{failOnWarning:
> true}} to the call the {{applyJavaNature()}}
> ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]).
> This starter issue is sponsored by [~swegner]. Feel free to [reach
> out|https://beam.apache.org/community/contact-us/] with questions or code
> review:
> * JIRA: [~swegner]
> * GitHub: [@swegner|https://github.com/swegner]
> * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel]
> * Email: swegner at google dot com
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)