This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1179cfb8a2dbfee5cb1e955c9917bdbddc2827f6 Author: Grant Henke <[email protected]> AuthorDate: Sat Feb 8 11:00:07 2020 -0600 [java] Fix various build warnings This patches some small checkstyle, spotbugs, and gradle build warnings. Change-Id: I75649e6ad26eec1942811fcb790fc9a2c0866a6b Reviewed-on: http://gerrit.cloudera.org:8080/15191 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> Reviewed-by: Alexey Serbin <[email protected]> --- java/config/spotbugs/excludeFilter.xml | 26 +++++++++++-- java/gradle.properties | 2 +- .../kudu/mapreduce/tools/ExportCsvMapper.java | 2 +- .../tools/IntegrationTestBigLinkedList.java | 43 ++++++++++++---------- .../java/org/apache/kudu/client/CallResponse.java | 10 +++++ .../main/java/org/apache/kudu/util/DateUtil.java | 6 +-- .../main/java/org/apache/kudu/util/HashUtil.java | 19 ++++++---- .../java/org/apache/kudu/client/TestSecurity.java | 4 +- .../java/org/apache/kudu/util/TestFashHash.java | 15 ++++---- .../kudu/subprocess/echo/TestEchoSubprocess.java | 31 ++++++++++------ .../java/org/apache/kudu/subprocess/MessageIO.java | 10 ++--- .../org/apache/kudu/subprocess/MessageWriter.java | 1 - .../apache/kudu/subprocess/ProtocolHandler.java | 16 ++++---- .../kudu/subprocess/SubprocessConfiguration.java | 2 +- .../apache/kudu/subprocess/SubprocessExecutor.java | 34 +++++++++-------- .../kudu/subprocess/SubprocessOutputStream.java | 4 +- .../org/apache/kudu/subprocess/TestMessageIO.java | 2 +- 17 files changed, 136 insertions(+), 91 deletions(-) diff --git a/java/config/spotbugs/excludeFilter.xml b/java/config/spotbugs/excludeFilter.xml index 4913579..2c785f6 100644 --- a/java/config/spotbugs/excludeFilter.xml +++ b/java/config/spotbugs/excludeFilter.xml @@ -112,14 +112,16 @@ <Bug pattern="BC_UNCONFIRMED_CAST_OF_RETURN_VALUE" /> <!-- Tests don't need to worry about serialization compatibility. --> <Bug pattern="SE_NO_SERIALVERSIONID" /> + <!-- Often tests testing exceptions have "exception" in the name. --> + <Bug pattern="NM_CLASS_NOT_EXCEPTION" /> <!-- Tests use i % 2 == 1 frequently to alternate behavior. --> <!-- TODO: converting these into negated "check for even" and remove. --> <Bug pattern="IM_BAD_CHECK_FOR_ODD" /> - <!-- The retry rule doesn't need to be read. - This naming scheme should be used for all RetryRule usage to avoid SpotBugs issues. + <!-- Junit rules don't need to be read. + This naming scheme should be used for all Junit Rule usage to avoid SpotBugs issues. --> <And> - <Field name="~.*retryRule" /> + <Field name="~.*Rule" /> <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" /> </And> <!-- We generally don't care about minor performance issues in tests. --> @@ -233,6 +235,24 @@ <Class name="org.apache.kudu.client.TestTableLocationsCache"/> <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" /> </Match> + <Match> + <!-- This is a private API and a null array is expected/handled. --> + <Class name="org.apache.kudu.util.NetUtil"/> + <Method name="getAllInetAddresses" /> + <Bug pattern="PZLA_PREFER_ZERO_LENGTH_ARRAYS" /> + </Match> + <Match> + <Class name="org.apache.kudu.util.HashUtil"/> + <Method name="fastHash64" /> + <Or> + <!-- This is the expected behavior. --> + <Bug pattern="BIT_ADD_OF_SIGNED_BYTE" /> + <!-- The fallthrough is intended and documented. --> + <Bug pattern="SF_SWITCH_FALLTHROUGH" /> + <!-- All cases are handled. --> + <Bug pattern="SF_SWITCH_NO_DEFAULT" /> + </Or> + </Match> <!-- kudu-client-tools exclusions --> <Match> diff --git a/java/gradle.properties b/java/gradle.properties index 739cb00..277a706 100755 --- a/java/gradle.properties +++ b/java/gradle.properties @@ -49,7 +49,7 @@ signing.gnupg.useLegacyGpg = true maxParallelForks = 1 # Warn about deprecated gradle usage -org.gradle.warning.mode = all +org.gradle.warning.mode = summary # Flags to speed up the gradle build. # https://docs.gradle.org/current/userguide/build_environment.html diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java index 9692fc6..cbc2ff2 100644 --- a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java +++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java @@ -105,7 +105,7 @@ public class ExportCsvMapper extends Mapper<NullWritable, RowResult, NullWritabl buf.append(value.getBoolean(i)); break; case DATE: - buf.append(value.getInt(i)); + buf.append(value.getDate(i).toString()); break; default: buf.append("<unknown type!>"); diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java index 7730cd8..dbfc79f 100644 --- a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java +++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java @@ -416,7 +416,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { private int width; @Override - protected void setup(Context context) throws KuduException { + protected void setup(Mapper.Context context) throws KuduException { id = "Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID(); Configuration conf = context.getConfiguration(); CommandLineParser parser = new CommandLineParser(conf); @@ -440,13 +440,14 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } @Override - protected void cleanup(Context context) throws KuduException { + protected void cleanup(Mapper.Context context) throws KuduException { session.close(); client.shutdown(); } @Override - protected void map(BytesWritable key, NullWritable value, Context output) throws IOException { + protected void map(BytesWritable key, NullWritable value, Mapper.Context output) + throws IOException { current[position] = new byte[key.getLength()]; System.arraycopy(key.getBytes(), 0, current[position], 0, key.getLength()); if (++position == current.length) { @@ -490,7 +491,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { first[first.length - 1] = ez; } - private void persist(Context output, byte[][] data, boolean update) throws KuduException { + private void persist(Mapper.Context output, byte[][] data, boolean update) + throws KuduException { for (int i = 0; i < data.length; i++) { Operation put = update ? table.newUpdate() : table.newInsert(); PartialRow row = put.getRow(); @@ -690,11 +692,12 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } } - public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> { + public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> { private ArrayList<byte[]> refs = new ArrayList<>(); @Override - public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context) + public void reduce(BytesWritable key, Iterable<BytesWritable> values, + Reducer<BytesWritable, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException { int defCount = 0; @@ -1089,7 +1092,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { private List<Pair<Long, Long>> headsCache; @Override - protected void setup(Context context) throws KuduException { + protected void setup(Mapper.Context context) throws KuduException { Configuration conf = context.getConfiguration(); CommandLineParser parser = new CommandLineParser(conf); client = parser.getClient(); @@ -1138,14 +1141,14 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { do { RowResult prev = nextNode(prevKeyOne, prevKeyTwo); if (prev == null) { - context.getCounter(Counts.BROKEN_LINKS).increment(1); + context.getCounter(Updater.Counts.BROKEN_LINKS).increment(1); LOG.warn(getStringFromKeys(prevKeyOne, prevKeyTwo) + " doesn't exist"); break; } // It's possible those columns are null, let's not break trying to read them. if (prev.isNull(0) || prev.isNull(1)) { - context.getCounter(Counts.BROKEN_LINKS).increment(1); + context.getCounter(Updater.Counts.BROKEN_LINKS).increment(1); LOG.warn(getStringFromKeys(prevKeyOne, prevKeyTwo) + " isn't referencing anywhere"); break; } @@ -1160,7 +1163,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } if (prevCount != currentCount) { - context.getCounter(Counts.BAD_UPDATE_COUNTS).increment(1); + context.getCounter(Updater.Counts.BAD_UPDATE_COUNTS).increment(1); LOG.warn(getStringFromKeys(prevKeyOne, prevKeyTwo) + " has a wrong updateCount, " + prevCount + " instead of " + currentCount); // Game over, there's corruption. @@ -1168,14 +1171,14 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } if (!prevClient.equals(client)) { - context.getCounter(Counts.BROKEN_LINKS).increment(1); + context.getCounter(Updater.Counts.BROKEN_LINKS).increment(1); LOG.warn(getStringFromKeys(prevKeyOne, prevKeyTwo) + " has the wrong client, " + "bad reference? Bad client= " + prevClient); break; } updateRow(prevKeyOne, prevKeyTwo, newCount); - context.getCounter(Counts.UPDATED_NODES).increment(1); + context.getCounter(Updater.Counts.UPDATED_NODES).increment(1); if (prevKeyOne % 10 == 0) { context.progress(); } @@ -1184,7 +1187,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } while (headKeyOne != prevKeyOne && headKeyTwo != prevKeyTwo); updateStatCounters(context, newCount); - context.getCounter(Counts.UPDATED_LINKS).increment(1); + context.getCounter(Updater.Counts.UPDATED_LINKS).increment(1); } /** @@ -1217,19 +1220,19 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { // TODO We didn't event get the first node? break; case 1: - context.getCounter(Counts.FIRST_UPDATE).increment(1); + context.getCounter(Updater.Counts.FIRST_UPDATE).increment(1); break; case 2: - context.getCounter(Counts.SECOND_UPDATE).increment(1); + context.getCounter(Updater.Counts.SECOND_UPDATE).increment(1); break; case 3: - context.getCounter(Counts.THIRD_UPDATE).increment(1); + context.getCounter(Updater.Counts.THIRD_UPDATE).increment(1); break; case 4: - context.getCounter(Counts.FOURTH_UPDATE).increment(1); + context.getCounter(Updater.Counts.FOURTH_UPDATE).increment(1); break; default: - context.getCounter(Counts.MORE_THAN_FOUR_UPDATES).increment(1); + context.getCounter(Updater.Counts.MORE_THAN_FOUR_UPDATES).increment(1); break; } } @@ -1281,8 +1284,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { if (success) { // Let's not continue looping if we have broken linked lists. - Counter brokenLinks = counters.findCounter(Counts.BROKEN_LINKS); - Counter badUpdates = counters.findCounter(Counts.BAD_UPDATE_COUNTS); + Counter brokenLinks = counters.findCounter(Updater.Counts.BROKEN_LINKS); + Counter badUpdates = counters.findCounter(Updater.Counts.BAD_UPDATE_COUNTS); if (brokenLinks.getValue() > 0 || badUpdates.getValue() > 0) { LOG.error("Corruption was detected, see the job's counters. Ending the update loop."); success = false; diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java index 41fa35d..855419b 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java @@ -158,6 +158,16 @@ final class CallResponse extends DefaultByteBufHolder { return new Slice(payload, offset, length); } + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + /** * Netty decoder which receives incoming frames (ByteBuf) * and constructs CallResponse objects. diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/DateUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/DateUtil.java index 10bb20d..af1c7d6 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/util/DateUtil.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/util/DateUtil.java @@ -32,7 +32,7 @@ public class DateUtil { /** * Check whether the date is within the range '0001-01-01':'9999-12-31' * - * @param the number days since the Unix epoch + * @param days the number days since the Unix epoch */ public static void checkDateWithinRange(long days) { if (days < MIN_DATE_VALUE || days > MAX_DATE_VALUE) { @@ -57,7 +57,7 @@ public class DateUtil { /** * Converts a number of days since the Unix epoch to a {@link java.sql.Date}. * - * @param the number of days since the Unix epoch + * @param days the number of days since the Unix epoch * @return the corresponding Date */ public static Date epochDaysToSqlDate(int days) { @@ -68,7 +68,7 @@ public class DateUtil { /** * Transforms a number of days since the Unix epoch into a string according the ISO-8601 format. * - * @param the number of days since the Unix epoch + * @param days the number of days since the Unix epoch * @return a string, in the format: YYYY-MM-DD */ public static String epochDaysToDateString(int days) { diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/HashUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/HashUtil.java index 0ca8896..d8dc526 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/util/HashUtil.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/util/HashUtil.java @@ -32,7 +32,7 @@ public class HashUtil { * @param seed seed to compute the hash * @return computed 64-bit hash value */ - public static long FastHash64(final byte[] buf, int len, long seed) { + public static long fastHash64(final byte[] buf, int len, long seed) { final long m = 0x880355f21e6d1965L; long h = seed ^ (len * m); long v; @@ -45,12 +45,13 @@ public class HashUtil { ((long)buf[pos + 3] << 24) + ((long)buf[pos + 4] << 32) + ((long)buf[pos + 5] << 40) + ((long)buf[pos + 6] << 48) + ((long)buf[pos + 7] << 56); - h ^= FastHashMix(v); + h ^= fastHashMix(v); h *= m; } v = 0; int pos2 = len8 * 8; + //CHECKSTYLE:OFF switch (len & 7) { case 7: v ^= (long)buf[pos2 + 6] << 48; @@ -72,13 +73,15 @@ public class HashUtil { // fall through case 1: v ^= buf[pos2]; - h ^= FastHashMix(v); + h ^= fastHashMix(v); h *= m; } + //CHECKSTYLE:ON - return FastHashMix(h); + return fastHashMix(h); } + /** * Compute 32-bit FastHash of the supplied data backed by byte array. * @@ -90,19 +93,19 @@ public class HashUtil { * @param seed seed to compute the hash * @return computed 32-bit hash value */ - public static int FastHash32(final byte[] buf, int len, int seed) { + public static int fastHash32(final byte[] buf, int len, int seed) { // the following trick converts the 64-bit hashcode to Fermat // residue, which shall retain information from both the higher // and lower parts of hashcode. - long h = FastHash64(buf, len, seed); + long h = fastHash64(buf, len, seed); return (int)(h - (h >>> 32)); } // Compression function for Merkle-Damgard construction. - private static long FastHashMix(long h) { + private static long fastHashMix(long h) { h ^= h >>> 23; h *= 0x2127599bf4325c37L; h ^= h >>> 47; return h; } -}; +} diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java index 480fd13..d4f993d 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java @@ -86,8 +86,8 @@ public class TestSecurity { } if (opts.contains(Option.SHORT_TOKENS_AND_TICKETS)) { mcb.addMasterServerFlag("--authn_token_validity_seconds=" + TICKET_LIFETIME_SECS) - .kdcRenewLifetime(RENEWABLE_LIFETIME_SECS + "s") - .kdcTicketLifetime(TICKET_LIFETIME_SECS + "s"); + .kdcRenewLifetime(RENEWABLE_LIFETIME_SECS + "s") + .kdcTicketLifetime(TICKET_LIFETIME_SECS + "s"); } miniCluster = mcb.numMasterServers(3) .numTabletServers(opts.contains(Option.START_TSERVERS) ? 3 : 0) diff --git a/java/kudu-client/src/test/java/org/apache/kudu/util/TestFashHash.java b/java/kudu-client/src/test/java/org/apache/kudu/util/TestFashHash.java index 6fa28fa..fe08ba3 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/util/TestFashHash.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/util/TestFashHash.java @@ -20,10 +20,11 @@ package org.apache.kudu.util; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; -import org.apache.kudu.test.junit.RetryRule; import org.junit.Rule; import org.junit.Test; +import org.apache.kudu.test.junit.RetryRule; + /** * Test FastHash64/32 returns the expected values for inputs. * @@ -38,13 +39,13 @@ public class TestFashHash { public void testFastHash64() { long hash; - hash = HashUtil.FastHash64("ab".getBytes(UTF_8), 2, 0); + hash = HashUtil.fastHash64("ab".getBytes(UTF_8), 2, 0); assertEquals(Long.parseUnsignedLong("17293172613997361769"), hash); - hash = HashUtil.FastHash64("abcdefg".getBytes(UTF_8), 7, 0); + hash = HashUtil.fastHash64("abcdefg".getBytes(UTF_8), 7, 0); assertEquals(Long.parseUnsignedLong("10206404559164245992"), hash); - hash = HashUtil.FastHash64("quick brown fox".getBytes(UTF_8), 15, 42); + hash = HashUtil.fastHash64("quick brown fox".getBytes(UTF_8), 15, 42); assertEquals(Long.parseUnsignedLong("3757424404558187042"), hash); } @@ -52,13 +53,13 @@ public class TestFashHash { public void testFastHash32() { int hash; - hash = HashUtil.FastHash32("ab".getBytes(UTF_8), 2, 0); + hash = HashUtil.fastHash32("ab".getBytes(UTF_8), 2, 0); assertEquals(Integer.parseUnsignedInt("2564147595"), hash); - hash = HashUtil.FastHash32("abcdefg".getBytes(UTF_8), 7, 0); + hash = HashUtil.fastHash32("abcdefg".getBytes(UTF_8), 7, 0); assertEquals(Integer.parseUnsignedInt("1497700618"), hash); - hash = HashUtil.FastHash32("quick brown fox".getBytes(UTF_8), 15, 42); + hash = HashUtil.fastHash32("quick brown fox".getBytes(UTF_8), 15, 42); assertEquals(Integer.parseUnsignedInt("1676541068"), hash); } } diff --git a/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java b/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java index 79610de..338e55b 100644 --- a/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java +++ b/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java @@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; +import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -35,9 +36,9 @@ import org.junit.rules.RuleChain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.kudu.subprocess.KuduSubprocessException; import org.apache.kudu.subprocess.MessageIO; import org.apache.kudu.subprocess.MessageTestUtil; -import org.apache.kudu.subprocess.KuduSubprocessException; import org.apache.kudu.subprocess.SubprocessExecutor; import org.apache.kudu.test.junit.RetryRule; @@ -46,12 +47,12 @@ import org.apache.kudu.test.junit.RetryRule; */ public class TestEchoSubprocess { private static final Logger LOG = LoggerFactory.getLogger(TestEchoSubprocess.class); - private static final Function<Throwable, Object> NO_ERR = e -> { + private static final Function<Throwable, Void> NO_ERR = e -> { LOG.error(String.format("Unexpected error: %s", e.getMessage())); - Assert.assertTrue(false); + Assert.fail(); return null; }; - private static final Function<Throwable, Object> HAS_ERR = e -> { + private static final Function<Throwable, Void> HAS_ERR = e -> { Assert.assertTrue(e instanceof KuduSubprocessException); return null; }; @@ -68,8 +69,9 @@ public class TestEchoSubprocess { public static class PrintStreamWithIOException extends PrintStream { - public PrintStreamWithIOException(OutputStream out) { - super(out); + public PrintStreamWithIOException(OutputStream out, boolean autoFlush, String encoding) + throws UnsupportedEncodingException { + super(out, autoFlush, encoding); } @Override @@ -81,7 +83,7 @@ public class TestEchoSubprocess { void runEchoSubprocess(InputStream in, PrintStream out, String[] args, - Function<Throwable, Object> errorHandler, + Function<Throwable, Void> errorHandler, boolean injectInterrupt) throws InterruptedException, ExecutionException, TimeoutException { System.setIn(in); @@ -103,7 +105,8 @@ public class TestEchoSubprocess { final byte[] messageBytes = MessageTestUtil.serializeMessage( MessageTestUtil.createEchoSubprocessRequest(message)); final InputStream in = new ByteArrayInputStream(messageBytes); - final PrintStream out = new PrintStream(new ByteArrayOutputStream()); + final PrintStream out = + new PrintStream(new ByteArrayOutputStream(), false, "UTF-8"); final String[] args = {""}; runEchoSubprocess(in, out, args, NO_ERR, /* injectInterrupt= */false); } @@ -115,7 +118,8 @@ public class TestEchoSubprocess { public void testMsgWithEmptyPayload() throws Exception { final byte[] emptyPayload = MessageIO.intToBytes(0); final InputStream in = new ByteArrayInputStream(emptyPayload); - final PrintStream out = new PrintStream(new ByteArrayOutputStream()); + final PrintStream out = + new PrintStream(new ByteArrayOutputStream(), false, "UTF-8"); final String[] args = {""}; runEchoSubprocess(in, out, args, NO_ERR, /* injectInterrupt= */false); } @@ -127,7 +131,8 @@ public class TestEchoSubprocess { public void testMalformedMsg() throws Exception { final byte[] messageBytes = "malformed".getBytes(StandardCharsets.UTF_8); final InputStream in = new ByteArrayInputStream(messageBytes); - final PrintStream out = new PrintStream(new ByteArrayOutputStream()); + final PrintStream out = + new PrintStream(new ByteArrayOutputStream(), false, "UTF-8"); final String[] args = {""}; thrown.expect(ExecutionException.class); thrown.expectMessage("Unable to read the protobuf message"); @@ -144,7 +149,8 @@ public class TestEchoSubprocess { final byte[] messageBytes = MessageTestUtil.serializeMessage( MessageTestUtil.createEchoSubprocessRequest(message)); final InputStream in = new ByteArrayInputStream(messageBytes); - final PrintStream out = new PrintStreamWithIOException(new ByteArrayOutputStream()); + final PrintStream out = + new PrintStreamWithIOException(new ByteArrayOutputStream(), false, "UTF-8"); // Only use one writer task to avoid get TimeoutException instead for // writer tasks that haven't encountered any exceptions. final String[] args = {"-w", "1"}; @@ -163,7 +169,8 @@ public class TestEchoSubprocess { final byte[] messageBytes = MessageTestUtil.serializeMessage( MessageTestUtil.createEchoSubprocessRequest(message)); final InputStream in = new ByteArrayInputStream(messageBytes); - final PrintStream out = new PrintStream(new ByteArrayOutputStream()); + final PrintStream out = + new PrintStream(new ByteArrayOutputStream(), false, "UTF-8"); final String[] args = {""}; thrown.expect(ExecutionException.class); thrown.expectMessage("Unable to put the message to the queue"); diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java index ea58fac..bcb15f9 100644 --- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java @@ -82,20 +82,20 @@ public class MessageIO { * stream into the specified byte array, starting at the offset <code>0</code>. * If it fails to read the specified size, <code>IOException</code> is thrown. * - * @return the message in byte array. * @throws EOFException if the end of the stream has been reached * @throws IOException if this input stream has been closed, an I/O * error occurs, or fail to read the specified size */ - private void doRead(byte bytes[], int size) throws EOFException, IOException { + private void doRead(byte[] bytes, int size) throws EOFException, IOException { Preconditions.checkNotNull(bytes); int read = in.read(bytes, 0, size); if (read == -1) { throw new EOFException("the end of the stream has been reached"); - } else if (read != size) + } else if (read != size) { throw new IOException( - String.format("unable to receive message, expected (%d) bytes " + - "but read (%d) bytes", size, read)); + String.format("unable to receive message, expected (%d) bytes " + + "but read (%d) bytes", size, read)); + } } /** diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java index d5afcee..e53bbf4 100644 --- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.BlockingQueue; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.yetus.audience.InterfaceAudience; diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolHandler.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolHandler.java index 6df91b5..e29a9c0 100644 --- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolHandler.java +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolHandler.java @@ -29,12 +29,12 @@ import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB; /** * Protocol that represents how to handle a protobuf message. * - * @param <Request> The request protobuf message - * @param <Response> The response protobuf message + * @param <RequestT> The request protobuf message + * @param <ResponseT> The response protobuf message */ @InterfaceAudience.Private -public abstract class ProtocolHandler<Request extends Message, - Response extends Message> { +public abstract class ProtocolHandler<RequestT extends Message, + ResponseT extends Message> { /** * Processes the given SubprocessRequestPB message according to the @@ -49,8 +49,8 @@ public abstract class ProtocolHandler<Request extends Message, Preconditions.checkNotNull(request); SubprocessResponsePB.Builder builder = SubprocessResponsePB.newBuilder(); builder.setId(request.getId()); - Class<Request> requestType = getRequestClass(); - Response resp = createResponse(request.getRequest().unpack(requestType)); + Class<RequestT> requestType = getRequestClass(); + ResponseT resp = createResponse(request.getRequest().unpack(requestType)); builder.setResponse(Any.pack(resp)); return builder.build(); } @@ -61,12 +61,12 @@ public abstract class ProtocolHandler<Request extends Message, * @param request the request message * @return a response */ - protected abstract Response createResponse(Request request); + protected abstract ResponseT createResponse(RequestT request); /** * Gets the class instance of request message. * * @return the request class instance */ - protected abstract Class<Request> getRequestClass(); + protected abstract Class<RequestT> getRequestClass(); } diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java index bafa3ae..dc2e72c 100644 --- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java @@ -19,8 +19,8 @@ package org.apache.kudu.subprocess; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java index e3645a0..d531850 100644 --- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java @@ -20,14 +20,16 @@ package org.apache.kudu.subprocess; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import com.google.common.annotations.VisibleForTesting; @@ -48,20 +50,20 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private public class SubprocessExecutor { private static final Logger LOG = LoggerFactory.getLogger(SubprocessExecutor.class); - private final Function<Throwable, Object> errorHandler; + private final Function<Throwable, Void> errorHandler; private boolean injectInterrupt = false; public SubprocessExecutor() { errorHandler = (t) -> { - // Exit the program with a nonzero status code if unexpected exception(s) - // thrown by the reader or writer tasks. - System.exit(1); - return null; + // If unexpected exception(s) are thrown by the reader or writer tasks, + // this error handler wraps the throwable in a runtime exception and rethrows, + // causing the program to exit with a nonzero status code. + throw new RuntimeException(t); }; } @VisibleForTesting - public SubprocessExecutor(Function<Throwable, Object> errorHandler) { + public SubprocessExecutor(Function<Throwable, Void> errorHandler) { this.errorHandler = errorHandler; } @@ -70,7 +72,7 @@ public class SubprocessExecutor { * * @param args the subprocess arguments * @param protocolHandler the subprocess protocol handler - * @param timeoutMs the maximum time to wait for subproces tasks to finish, -1 means + * @param timeoutMs the maximum time to wait for subprocess tasks to finish, -1 means * no time out and the tasks will continue execute until it finishes * @throws ExecutionException if any tasks of the subprocess completed exceptionally * @throws InterruptedException if the current thread was interrupted while waiting @@ -100,16 +102,16 @@ public class SubprocessExecutor { // Start a single reader thread and run the task asynchronously. MessageReader reader = new MessageReader(inboundQueue, messageIO, injectInterrupt); - CompletableFuture readerFuture = CompletableFuture.runAsync(reader, readerService); + CompletableFuture<Void> readerFuture = CompletableFuture.runAsync(reader, readerService); readerFuture.exceptionally(errorHandler); // Start multiple writer threads and run the tasks asynchronously. MessageWriter writer = new MessageWriter(inboundQueue, messageIO, protocolHandler); - CompletableFuture[] writerFutures = new CompletableFuture[maxWriterThread]; + List<CompletableFuture<Void>> writerFutures = new ArrayList<>(); for (int i = 0; i < maxWriterThread; i++) { - CompletableFuture writerFuture = CompletableFuture.runAsync(writer, writerService); + CompletableFuture<Void> writerFuture = CompletableFuture.runAsync(writer, writerService); writerFuture.exceptionally(errorHandler); - writerFutures[i] = writerFuture; + writerFutures.add(writerFuture); } // Wait until the tasks finish execution. -1 means the reader (or writer) tasks @@ -117,10 +119,10 @@ public class SubprocessExecutor { // to run forever, e.g. in tests, wait for the specified timeout. if (timeoutMs == -1) { readerFuture.join(); - CompletableFuture.allOf(writerFutures).join(); + CompletableFuture.allOf(writerFutures.toArray(new CompletableFuture[0])).join(); } else { readerFuture.get(timeoutMs, TimeUnit.MILLISECONDS); - CompletableFuture.allOf(writerFutures) + CompletableFuture.allOf(writerFutures.toArray(new CompletableFuture[0])) .get(timeoutMs, TimeUnit.MILLISECONDS); } } catch (IOException e) { diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessOutputStream.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessOutputStream.java index e3ce7e0..5690562 100644 --- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessOutputStream.java +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessOutputStream.java @@ -50,7 +50,7 @@ public class SubprocessOutputStream extends OutputStream { } @Override - public void write(byte buf[], int off, int len) throws IOException { + public void write(byte[] buf, int off, int len) throws IOException { out.write(buf, off, len); if (out.checkError()) { throw new IOException(WRITE_ERR); @@ -58,7 +58,7 @@ public class SubprocessOutputStream extends OutputStream { } @Override - public void write(byte b[]) throws IOException { + public void write(byte[] b) throws IOException { out.write(b); if (out.checkError()) { throw new IOException(WRITE_ERR); diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java index abedf2b..0f99afa 100644 --- a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java +++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java @@ -49,7 +49,7 @@ public class TestMessageIO { // // See https://stackoverflow.com/q/28846088 for more information. @Rule - public RuleChain chain = RuleChain.outerRule(retryRule).around(thrown); + public RuleChain chainRule = RuleChain.outerRule(retryRule).around(thrown); public static class PrintStreamOverload extends PrintStream { public PrintStreamOverload(OutputStream out) {
