http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java index 78816c0..3a9e158 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java @@ -52,17 +52,17 @@ public class TestNettyServerWithCallbacks { private static Transceiver transceiver; private static Simple.Callback simpleClient; private static final AtomicBoolean ackFlag = new AtomicBoolean(false); - private static final AtomicReference<CountDownLatch> ackLatch = + private static final AtomicReference<CountDownLatch> ackLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1)); private static Simple simpleService = new SimpleImpl(ackFlag); - + @BeforeClass public static void initializeConnections() throws Exception { // start server Responder responder = new SpecificResponder(Simple.class, simpleService); server = new NettyServer(responder, new InetSocketAddress(0)); server.start(); - + int serverPort = server.getPort(); System.out.println("server port : " + serverPort); @@ -70,7 +70,7 @@ public class TestNettyServerWithCallbacks { serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS); simpleClient = SpecificRequestor.getClient(Simple.Callback.class, transceiver); } - + @AfterClass public static void tearDownConnections() throws Exception { if (transceiver != null) { @@ -80,18 +80,18 @@ public class TestNettyServerWithCallbacks { server.close(); } } - + @Test public void greeting() throws Exception { // Test synchronous RPC: Assert.assertEquals("Hello, how are you?", simpleClient.hello("how are you?")); - + // Test asynchronous RPC (future): CallFuture<String> future1 = new CallFuture<String>(); simpleClient.hello("World!", future1); Assert.assertEquals("Hello, World!", future1.get(2, TimeUnit.SECONDS)); Assert.assertNull(future1.getError()); - + // Test asynchronous RPC (callback): final CallFuture<String> future2 = new CallFuture<String>(); simpleClient.hello("what's up?", new Callback<String>() { @@ -107,7 +107,7 @@ public class TestNettyServerWithCallbacks { Assert.assertEquals("Hello, what's up?", future2.get(2, TimeUnit.SECONDS)); Assert.assertNull(future2.getError()); } - + @Test public void echo() throws Exception { TestRecord record = TestRecord.newBuilder().setHash( @@ -115,16 +115,16 @@ public class TestNettyServerWithCallbacks { new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8 })). setKind(org.apache.avro.test.Kind.FOO). setName("My Record").build(); - + // Test synchronous RPC: Assert.assertEquals(record, simpleClient.echo(record)); - + // Test asynchronous RPC (future): CallFuture<TestRecord> future1 = new CallFuture<TestRecord>(); simpleClient.echo(record, future1); Assert.assertEquals(record, future1.get(2, TimeUnit.SECONDS)); Assert.assertNull(future1.getError()); - + // Test asynchronous RPC (callback): final CallFuture<TestRecord> future2 = new CallFuture<TestRecord>(); simpleClient.echo(record, new Callback<TestRecord>() { @@ -140,18 +140,18 @@ public class TestNettyServerWithCallbacks { Assert.assertEquals(record, future2.get(2, TimeUnit.SECONDS)); Assert.assertNull(future2.getError()); } - + @Test public void add() throws Exception { // Test synchronous RPC: Assert.assertEquals(8, simpleClient.add(2, 6)); - + // Test asynchronous RPC (future): CallFuture<Integer> future1 = new CallFuture<Integer>(); simpleClient.add(8, 8, future1); Assert.assertEquals(new Integer(16), future1.get(2, TimeUnit.SECONDS)); Assert.assertNull(future1.getError()); - + // Test asynchronous RPC (callback): final CallFuture<Integer> future2 = new CallFuture<Integer>(); simpleClient.add(512, 256, new Callback<Integer>() { @@ -167,20 +167,20 @@ public class TestNettyServerWithCallbacks { Assert.assertEquals(new Integer(768), future2.get(2, TimeUnit.SECONDS)); Assert.assertNull(future2.getError()); } - + @Test public void echoBytes() throws Exception { ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 }); - + // Test synchronous RPC: Assert.assertEquals(byteBuffer, simpleClient.echoBytes(byteBuffer)); - + // Test asynchronous RPC (future): CallFuture<ByteBuffer> future1 = new CallFuture<ByteBuffer>(); simpleClient.echoBytes(byteBuffer, future1); Assert.assertEquals(byteBuffer, future1.get(2, TimeUnit.SECONDS)); Assert.assertNull(future1.getError()); - + // Test asynchronous RPC (callback): final CallFuture<ByteBuffer> future2 = new CallFuture<ByteBuffer>(); simpleClient.echoBytes(byteBuffer, new Callback<ByteBuffer>() { @@ -196,7 +196,7 @@ public class TestNettyServerWithCallbacks { Assert.assertEquals(byteBuffer, future2.get(2, TimeUnit.SECONDS)); Assert.assertNull(future2.getError()); } - + @Test() public void error() throws IOException, InterruptedException, TimeoutException { // Test synchronous RPC: @@ -209,7 +209,7 @@ public class TestNettyServerWithCallbacks { e.printStackTrace(); Assert.fail("Unexpected error: " + e.toString()); } - + // Test asynchronous RPC (future): CallFuture<Void> future = new CallFuture<Void>(); simpleClient.error(future); @@ -217,14 +217,14 @@ public class TestNettyServerWithCallbacks { future.get(2, TimeUnit.SECONDS); Assert.fail("Expected " + TestError.class.getCanonicalName() + " to be thrown"); } catch (ExecutionException e) { - Assert.assertTrue("Expected " + TestError.class.getCanonicalName(), + Assert.assertTrue("Expected " + TestError.class.getCanonicalName(), e.getCause() instanceof TestError); } Assert.assertNotNull(future.getError()); - Assert.assertTrue("Expected " + TestError.class.getCanonicalName(), + Assert.assertTrue("Expected " + TestError.class.getCanonicalName(), future.getError() instanceof TestError); Assert.assertNull(future.getResult()); - + // Test asynchronous RPC (callback): final CountDownLatch latch = new CountDownLatch(1); final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>(); @@ -243,24 +243,24 @@ public class TestNettyServerWithCallbacks { Assert.assertNotNull(errorRef.get()); Assert.assertTrue(errorRef.get() instanceof TestError); } - + @Test public void ack() throws Exception { simpleClient.ack(); ackLatch.get().await(2, TimeUnit.SECONDS); Assert.assertTrue("Expected ack flag to be set", ackFlag.get()); - + ackLatch.set(new CountDownLatch(1)); simpleClient.ack(); ackLatch.get().await(2, TimeUnit.SECONDS); Assert.assertFalse("Expected ack flag to be cleared", ackFlag.get()); } - + @Test public void testSendAfterChannelClose() throws Exception { - // Start up a second server so that closing the server doesn't + // Start up a second server so that closing the server doesn't // interfere with the other unit tests: - Server server2 = new NettyServer(new SpecificResponder(Simple.class, simpleService), + Server server2 = new NettyServer(new SpecificResponder(Simple.class, simpleService), new InetSocketAddress(0)); server2.start(); try { @@ -270,12 +270,12 @@ public class TestNettyServerWithCallbacks { Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress( serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS); try { - Simple.Callback simpleClient2 = + Simple.Callback simpleClient2 = SpecificRequestor.getClient(Simple.Callback.class, transceiver2); // Verify that connection works: Assert.assertEquals(3, simpleClient2.add(1, 2)); - + // Try again with callbacks: CallFuture<Integer> addFuture = new CallFuture<Integer>(); simpleClient2.add(1, 2, addFuture); @@ -285,7 +285,7 @@ public class TestNettyServerWithCallbacks { server2.close(); Thread.sleep(1000L); - // Send a new RPC, and verify that it throws an Exception that + // Send a new RPC, and verify that it throws an Exception that // can be detected by the client: boolean ioeCaught = false; try { @@ -299,8 +299,8 @@ public class TestNettyServerWithCallbacks { throw e; } Assert.assertTrue("Expected IOException", ioeCaught); - - // Send a new RPC with callback, and verify that the correct Exception + + // Send a new RPC with callback, and verify that the correct Exception // is thrown: ioeCaught = false; try { @@ -322,13 +322,13 @@ public class TestNettyServerWithCallbacks { server2.close(); } } - + @Test public void cancelPendingRequestsOnTransceiverClose() throws Exception { - // Start up a second server so that closing the server doesn't + // Start up a second server so that closing the server doesn't // interfere with the other unit tests: BlockingSimpleImpl blockingSimpleImpl = new BlockingSimpleImpl(); - Server server2 = new NettyServer(new SpecificResponder(Simple.class, + Server server2 = new NettyServer(new SpecificResponder(Simple.class, blockingSimpleImpl), new InetSocketAddress(0)); server2.start(); try { @@ -338,18 +338,18 @@ public class TestNettyServerWithCallbacks { CallFuture<Integer> addFuture = new CallFuture<Integer>(); Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress( serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS); - try { - Simple.Callback simpleClient2 = + try { + Simple.Callback simpleClient2 = SpecificRequestor.getClient(Simple.Callback.class, transceiver2); - + // The first call has to block for the handshake: Assert.assertEquals(3, simpleClient2.add(1, 2)); - + // Now acquire the semaphore so that the server will block: blockingSimpleImpl.acquireRunPermit(); simpleClient2.add(1, 2, addFuture); } finally { - // When the transceiver is closed, the CallFuture should get + // When the transceiver is closed, the CallFuture should get // an IOException transceiver2.close(); } @@ -369,7 +369,7 @@ public class TestNettyServerWithCallbacks { server2.close(); } } - + @Test public void cancelPendingRequestsAfterChannelCloseByServerShutdown() throws Exception { // The purpose of this test is to verify that a client doesn't stay @@ -415,25 +415,25 @@ public class TestNettyServerWithCallbacks { } } }); - + // Start client call t.start(); - + // Wait until method is entered on the server side blockingSimpleImpl.acquireEnterPermit(); - + // The server side method is now blocked waiting on the run permit // (= is busy handling the request) - + // Stop the server server2.close(); - + // With the server gone, we expect the client to get some exception and exit // Wait for client thread to exit t.join(10000); - + Assert.assertFalse("Client request should not be blocked on server shutdown", t.isAlive()); - + } finally { blockingSimpleImpl.releaseRunPermit(); server2.close(); @@ -441,15 +441,15 @@ public class TestNettyServerWithCallbacks { transceiver2.close(); } } - + @Test public void clientReconnectAfterServerRestart() throws Exception { - // Start up a second server so that closing the server doesn't + // Start up a second server so that closing the server doesn't // interfere with the other unit tests: SimpleImpl simpleImpl = new BlockingSimpleImpl(); - Server server2 = new NettyServer(new SpecificResponder(Simple.class, + Server server2 = new NettyServer(new SpecificResponder(Simple.class, simpleImpl), new InetSocketAddress(0)); - try { + try { server2.start(); int serverPort = server2.getPort(); System.out.println("server2 port : " + serverPort); @@ -457,10 +457,10 @@ public class TestNettyServerWithCallbacks { // Initialize a client, and establish a connection to the server: Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress( serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS); - Simple.Callback simpleClient2 = + Simple.Callback simpleClient2 = SpecificRequestor.getClient(Simple.Callback.class, transceiver2); Assert.assertEquals(3, simpleClient2.add(1, 2)); - + // Restart the server: server2.close(); try { @@ -471,11 +471,11 @@ public class TestNettyServerWithCallbacks { // Expected since server is no longer running } Thread.sleep(2000L); - server2 = new NettyServer(new SpecificResponder(Simple.class, + server2 = new NettyServer(new SpecificResponder(Simple.class, simpleImpl), new InetSocketAddress(serverPort)); server2.start(); - - // Invoke an RPC using the same client, which should reestablish the + + // Invoke an RPC using the same client, which should reestablish the // connection to the server: Assert.assertEquals(3, simpleClient2.add(1, 2)); } finally { @@ -489,7 +489,7 @@ public class TestNettyServerWithCallbacks { final int threadCount = 8; final long runTimeMillis = 10 * 1000L; ExecutorService threadPool = Executors.newFixedThreadPool(threadCount); - + System.out.println("Running performance test for " + runTimeMillis + "ms..."); final AtomicLong rpcCount = new AtomicLong(0L); final AtomicBoolean runFlag = new AtomicBoolean(true); @@ -511,23 +511,23 @@ public class TestNettyServerWithCallbacks { } }); } - + startLatch.await(2, TimeUnit.SECONDS); Thread.sleep(runTimeMillis); runFlag.set(false); threadPool.shutdown(); Assert.assertTrue("Timed out shutting down thread pool", threadPool.awaitTermination(2, TimeUnit.SECONDS)); - System.out.println("Completed " + rpcCount.get() + " RPCs in " + runTimeMillis + - "ms => " + (((double)rpcCount.get() / (double)runTimeMillis) * 1000) + " RPCs/sec, " + + System.out.println("Completed " + rpcCount.get() + " RPCs in " + runTimeMillis + + "ms => " + (((double)rpcCount.get() / (double)runTimeMillis) * 1000) + " RPCs/sec, " + ((double)runTimeMillis / (double)rpcCount.get()) + " ms/RPC."); } - + /** * Implementation of the Simple interface. */ private static class SimpleImpl implements Simple { private final AtomicBoolean ackFlag; - + /** * Creates a SimpleImpl. * @param ackFlag the AtomicBoolean to toggle when ack() is called. @@ -535,7 +535,7 @@ public class TestNettyServerWithCallbacks { public SimpleImpl(final AtomicBoolean ackFlag) { this.ackFlag = ackFlag; } - + @Override public String hello(String greeting) throws AvroRemoteException { return "Hello, " + greeting; @@ -567,7 +567,7 @@ public class TestNettyServerWithCallbacks { ackLatch.get().countDown(); } } - + /** * A SimpleImpl that requires a semaphore permit before executing any method. */ @@ -576,14 +576,14 @@ public class TestNettyServerWithCallbacks { private final Semaphore enterSemaphore = new Semaphore(1); /** Semaphore that must be acquired for the method to run and exit. */ private final Semaphore runSemaphore = new Semaphore(1); - + /** * Creates a BlockingSimpleImpl. */ public BlockingSimpleImpl() { super(new AtomicBoolean()); } - + @Override public String hello(String greeting) throws AvroRemoteException { releaseEnterPermit(); @@ -649,7 +649,7 @@ public class TestNettyServerWithCallbacks { releaseRunPermit(); } } - + /** * Acquires a single permit from the semaphore. */ @@ -661,7 +661,7 @@ public class TestNettyServerWithCallbacks { throw new RuntimeException(e); } } - + /** * Releases a single permit to the semaphore. */
http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java index 98dc9e6..d805443 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java @@ -60,7 +60,7 @@ public class TestNettyServerWithCompression extends TestNettyServer{ channelFactory, new CompressionChannelPipelineFactory(), null); } - + protected static Transceiver initializeTransceiver(int serverPort) throws IOException { return new NettyTransceiver(new InetSocketAddress(serverPort), new CompressionChannelFactory(), http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java index 1611c01..a7f8e6a 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java @@ -42,7 +42,7 @@ import org.jboss.netty.handler.ssl.SslHandler; public class TestNettyServerWithSSL extends TestNettyServer{ public static final String TEST_CERTIFICATE = "servercert.p12"; public static final String TEST_CERTIFICATE_PASSWORD = "s3cret"; - + protected static Server initializeServer(Responder responder) { ChannelFactory channelFactory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), @@ -52,7 +52,7 @@ public class TestNettyServerWithSSL extends TestNettyServer{ channelFactory, new SSLChannelPipelineFactory(), null); } - + protected static Transceiver initializeTransceiver(int serverPort) throws IOException { return new NettyTransceiver(new InetSocketAddress(serverPort), new SSLChannelFactory(), http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java index d816fa5..02bc63d 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java @@ -35,17 +35,17 @@ import org.junit.Test; public class TestRpcPluginOrdering { private static AtomicInteger orderCounter = new AtomicInteger(); - + public class OrderPlugin extends RPCPlugin{ public void clientStartConnect(RPCContext context) { assertEquals(0, orderCounter.getAndIncrement()); } - + public void clientSendRequest(RPCContext context) { assertEquals(1, orderCounter.getAndIncrement()); } - + public void clientReceiveResponse(RPCContext context) { assertEquals(6, orderCounter.getAndIncrement()); } @@ -66,16 +66,16 @@ public class TestRpcPluginOrdering { assertEquals(4, orderCounter.getAndIncrement()); } } - + @Test public void testRpcPluginOrdering() throws Exception { OrderPlugin plugin = new OrderPlugin(); - + SpecificResponder responder = new SpecificResponder(Mail.class, new TestMailImpl()); SpecificRequestor requestor = new SpecificRequestor(Mail.class, new LocalTransceiver(responder)); responder.addRPCPlugin(plugin); requestor.addRPCPlugin(plugin); - + Mail client = SpecificRequestor.getClient(Mail.class, requestor); Message message = createTestMessage(); client.send(message); @@ -89,7 +89,7 @@ public class TestRpcPluginOrdering { build(); return message; } - + private static class TestMailImpl implements Mail{ public String send(Message message) throws AvroRemoteException { return "Received"; http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java index 68b40bf..2034e05 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java @@ -66,9 +66,9 @@ public class TestSaslAnonymous extends TestProtocolGeneric { new SaslSocketTransceiver(new InetSocketAddress(s.getPort())); ProtoInterface proxy = (ProtoInterface)ReflectRequestor.getClient(ProtoInterface.class, client); - + byte[] result = proxy.test(new byte[64*1024]); - + client.close(); s.close(); } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java index cef4f77..34651eb 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java @@ -115,7 +115,7 @@ public class TestSaslDigestMd5 extends TestProtocolGeneric { Transceiver c = new SaslSocketTransceiver(new InetSocketAddress(s.getPort())); GenericRequestor requestor = new GenericRequestor(PROTOCOL, c); - GenericRecord params = + GenericRecord params = new GenericData.Record(PROTOCOL.getMessages().get("hello").getRequest()); params.put("greeting", "bob"); Utf8 response = (Utf8)requestor.request("hello", params); @@ -157,7 +157,7 @@ public class TestSaslDigestMd5 extends TestProtocolGeneric { Transceiver c = new SaslSocketTransceiver (new InetSocketAddress(server.getPort()), saslClient); GenericRequestor requestor = new GenericRequestor(PROTOCOL, c); - GenericRecord params = + GenericRecord params = new GenericData.Record(PROTOCOL.getMessages().get("hello").getRequest()); params.put("greeting", "bob"); Utf8 response = (Utf8)requestor.request("hello", params); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java b/lang/java/ipc/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java index 0bdd700..9ab6eb7 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/ipc/stats/TestHistogram.java @@ -47,10 +47,10 @@ public class TestHistogram { assertArrayEquals(new int[] { 1, 1, 2, 4, 8, 4 }, h.getHistogram()); assertEquals("[0,1)=1;[1,2)=1;[2,4)=2;[4,8)=4;[8,16)=8;[16,infinity)=4", h.toString()); - + String[] correctBucketLabels = { "[0,1)", "[1,2)", "[2,4)", "[4,8)", "[8,16)", "[16,infinity)"}; - + // test bucket iterator int pos = 0; Iterator<String> it = h.getSegmenter().getBuckets(); @@ -59,7 +59,7 @@ public class TestHistogram { pos = pos + 1; } assertEquals(correctBucketLabels.length, pos); - + List<String> labels = h.getSegmenter().getBucketLabels(); assertEquals(correctBucketLabels.length, labels.size()); if (labels.size() == correctBucketLabels.length) { @@ -71,14 +71,14 @@ public class TestHistogram { String[] correctBoundryLabels = { "0", "1", "2", "4", "8", "16"}; List<String> boundryLabels = h.getSegmenter().getBoundaryLabels(); - + assertEquals(correctBoundryLabels.length, boundryLabels.size()); if (boundryLabels.size() == correctBoundryLabels.length) { for (int i = 0; i < boundryLabels.size(); i++) { assertEquals(correctBoundryLabels[i], boundryLabels.get(i)); } } - + List<Entry<String>> entries = new ArrayList<Entry<String>>(); for (Entry<String> entry : h.entries()) { entries.add(entry); @@ -86,13 +86,13 @@ public class TestHistogram { assertEquals("[0,1)", entries.get(0).bucket); assertEquals(4, entries.get(5).count); assertEquals(6, entries.size()); - + h.add(1010); h.add(9191); List<Integer> recent = h.getRecentAdditions(); assertTrue(recent.contains(1010)); assertTrue(recent.contains(9191)); - + } @Test(expected=Histogram.SegmenterException.class) @@ -110,11 +110,11 @@ public class TestHistogram { public Iterator<String> getBuckets() { return Arrays.asList("X").iterator(); } - + public List<String> getBoundaryLabels() { return Arrays.asList("X"); } - + public List<String> getBucketLabels() { return Arrays.asList("X"); } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java b/lang/java/ipc/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java index eb234a5..c0f9664 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/ipc/stats/TestStatsPluginAndServlet.java @@ -138,12 +138,12 @@ public class TestStatsPluginAndServlet { r.addRPCPlugin(statsPlugin); Transceiver t = new LocalTransceiver(r); makeRequest(t); - + String resp = generateServletResponse(statsPlugin); assertTrue(resp.contains("Average: 2.0")); - + } - + private RPCContext makeContext() { RPCContext context = new RPCContext(); context.setMessage(message); @@ -197,10 +197,10 @@ public class TestStatsPluginAndServlet { avroServer.start(); StatsServer ss = new StatsServer(p, 8080); - + HttpTransceiver trans = new HttpTransceiver( new URL("http://localhost:" + Integer.parseInt(args[0]))); - GenericRequestor req = new GenericRequestor(protocol, trans); + GenericRequestor req = new GenericRequestor(protocol, trans); while(true) { Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificData.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificData.java b/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificData.java index 1a05a39..4518583 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificData.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificData.java @@ -49,7 +49,7 @@ import org.apache.avro.test.Reserved; import org.apache.avro.generic.GenericRecord; public class TestSpecificData { - + @Test /** Make sure that even with nulls, hashCode() doesn't throw NPE. */ public void testHashCode() { http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificDatumReader.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificDatumReader.java b/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificDatumReader.java index 4b73de7..5b6cca3 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificDatumReader.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificDatumReader.java @@ -40,7 +40,7 @@ import test.StringablesRecord; public class TestSpecificDatumReader { public static byte[] serializeRecord(FooBarSpecificRecord fooBarSpecificRecord) throws IOException { - SpecificDatumWriter<FooBarSpecificRecord> datumWriter = + SpecificDatumWriter<FooBarSpecificRecord> datumWriter = new SpecificDatumWriter<FooBarSpecificRecord>(FooBarSpecificRecord.SCHEMA$); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null); @@ -67,14 +67,14 @@ public class TestSpecificDatumReader { newBuilder.setNicknames(Arrays.asList("bar")); newBuilder.setRelatedids(Arrays.asList(1,2,3)); FooBarSpecificRecord specificRecord = newBuilder.build(); - + byte[] recordBytes = serializeRecord(specificRecord); - + Decoder decoder = DecoderFactory.get().binaryDecoder(recordBytes, null); SpecificDatumReader<FooBarSpecificRecord> specificDatumReader = new SpecificDatumReader<FooBarSpecificRecord>(FooBarSpecificRecord.SCHEMA$); FooBarSpecificRecord deserialized = new FooBarSpecificRecord(); specificDatumReader.read(deserialized, decoder); - + assertEquals(specificRecord, deserialized); } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificErrorBuilder.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificErrorBuilder.java b/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificErrorBuilder.java index 598a22a..1de8805 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificErrorBuilder.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificErrorBuilder.java @@ -30,7 +30,7 @@ public class TestSpecificErrorBuilder { TestError.Builder testErrorBuilder = TestError.newBuilder(). setValue("value").setCause(new NullPointerException()). setMessage$("message$"); - + // Test has methods Assert.assertTrue(testErrorBuilder.hasValue()); Assert.assertNotNull(testErrorBuilder.getValue()); @@ -38,23 +38,23 @@ public class TestSpecificErrorBuilder { Assert.assertNotNull(testErrorBuilder.getCause()); Assert.assertTrue(testErrorBuilder.hasMessage$()); Assert.assertNotNull(testErrorBuilder.getMessage$()); - + TestError testError = testErrorBuilder.build(); Assert.assertEquals("value", testError.getValue()); Assert.assertEquals("value", testError.getMessage()); Assert.assertEquals("message$", testError.getMessage$()); - + // Test copy constructor - Assert.assertEquals(testErrorBuilder, + Assert.assertEquals(testErrorBuilder, TestError.newBuilder(testErrorBuilder)); Assert.assertEquals(testErrorBuilder, TestError.newBuilder(testError)); - + TestError error = new TestError("value", new NullPointerException()); error.setMessage$("message"); Assert.assertEquals(error, TestError.newBuilder().setValue("value"). setCause(new NullPointerException()).setMessage$("message").build()); - + // Test clear testErrorBuilder.clearValue(); Assert.assertFalse(testErrorBuilder.hasValue()); @@ -66,7 +66,7 @@ public class TestSpecificErrorBuilder { Assert.assertFalse(testErrorBuilder.hasMessage$()); Assert.assertNull(testErrorBuilder.getMessage$()); } - + @Test(expected=org.apache.avro.AvroRuntimeException.class) public void attemptToSetNonNullableFieldToNull() { TestError.newBuilder().setMessage$(null); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificRecordBuilder.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificRecordBuilder.java b/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificRecordBuilder.java index a94b498..7305757 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificRecordBuilder.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificRecordBuilder.java @@ -57,7 +57,7 @@ public class TestSpecificRecordBuilder { Assert.assertNull(builder.getFriends()); Assert.assertFalse(builder.hasLanguages()); Assert.assertNull(builder.getLanguages()); - + Person person = builder.build(); Assert.assertEquals("James Gosling", person.getName().toString()); Assert.assertEquals(new Integer(1955), person.getYearOfBirth()); @@ -69,11 +69,11 @@ public class TestSpecificRecordBuilder { Assert.assertEquals(2, person.getLanguages().size()); Assert.assertEquals("English", person.getLanguages().get(0).toString()); Assert.assertEquals("Java", person.getLanguages().get(1).toString()); - + // Test copy constructors: Assert.assertEquals(builder, Person.newBuilder(builder)); Assert.assertEquals(person, Person.newBuilder(person).build()); - + Person.Builder builderCopy = Person.newBuilder(person); Assert.assertEquals("James Gosling", builderCopy.getName().toString()); Assert.assertEquals(new Integer(1955), builderCopy.getYearOfBirth()); @@ -81,7 +81,7 @@ public class TestSpecificRecordBuilder { Assert.assertEquals("CA", builderCopy.getState().toString()); Assert.assertNotNull(builderCopy.getFriends()); // friends should default to an empty list Assert.assertEquals(0, builderCopy.getFriends().size()); - + // Test clearing fields: builderCopy.clearFriends().clearCountry(); Assert.assertFalse(builderCopy.hasFriends()); @@ -92,7 +92,7 @@ public class TestSpecificRecordBuilder { Assert.assertNotNull(person2.getFriends()); Assert.assertTrue(person2.getFriends().isEmpty()); } - + @Test public void testUnions() { long datetime = 1234L; @@ -106,15 +106,15 @@ public class TestSpecificRecordBuilder { Assert.assertEquals(datetime, p.getDatetime().longValue()); Assert.assertEquals(ProductPage.class, p.getPageContext().getClass()); Assert.assertEquals(product, ((ProductPage)p.getPageContext()).getProduct()); - + PageView p2 = PageView.newBuilder(p).build(); - + Assert.assertEquals(datetime, p2.getDatetime().longValue()); Assert.assertEquals(ProductPage.class, p2.getPageContext().getClass()); Assert.assertEquals(product, ((ProductPage)p2.getPageContext()).getProduct()); - + Assert.assertEquals(p, p2); - + } @Test @@ -136,7 +136,7 @@ public class TestSpecificRecordBuilder { .setStringField("MyInterop") .setUnionField(2.71828) .build(); - + Interop copy = Interop.newBuilder(interop).build(); Assert.assertEquals(interop.getArrayField().size(), copy.getArrayField().size()); Assert.assertEquals(interop.getArrayField(), copy.getArrayField()); @@ -154,7 +154,7 @@ public class TestSpecificRecordBuilder { Assert.assertEquals(interop.getUnionField(), copy.getUnionField()); Assert.assertEquals(interop, copy); } - + @Test(expected=org.apache.avro.AvroRuntimeException.class) public void attemptToSetNonNullableFieldToNull() { Person.newBuilder().setName(null); @@ -202,11 +202,11 @@ public class TestSpecificRecordBuilder { } long durationNanos = System.nanoTime() - startTimeNanos; double durationMillis = durationNanos / 1e6d; - System.out.println("Built " + count + " records in " + durationMillis + "ms (" + - (count / (durationMillis / 1000d)) + " records/sec, " + (durationMillis / count) + + System.out.println("Built " + count + " records in " + durationMillis + "ms (" + + (count / (durationMillis / 1000d)) + " records/sec, " + (durationMillis / count) + "ms/record"); } - + @Ignore @Test public void testBuilderPerformanceWithDefaultValues() { @@ -217,8 +217,8 @@ public class TestSpecificRecordBuilder { } long durationNanos = System.nanoTime() - startTimeNanos; double durationMillis = durationNanos / 1e6d; - System.out.println("Built " + count + " records in " + durationMillis + "ms (" + - (count / (durationMillis / 1000d)) + " records/sec, " + (durationMillis / count) + + System.out.println("Built " + count + " records in " + durationMillis + "ms (" + + (count / (durationMillis / 1000d)) + " records/sec, " + (durationMillis / count) + "ms/record"); } @@ -241,8 +241,8 @@ public class TestSpecificRecordBuilder { } long durationNanos = System.nanoTime() - startTimeNanos; double durationMillis = durationNanos / 1e6d; - System.out.println("Built " + count + " records in " + durationMillis + "ms (" + - (count / (durationMillis / 1000d)) + " records/sec, " + (durationMillis / count) + + System.out.println("Built " + count + " records in " + durationMillis + "ms (" + + (count / (durationMillis / 1000d)) + " records/sec, " + (durationMillis / count) + "ms/record"); } } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/mapred/pom.xml b/lang/java/mapred/pom.xml index 62b5544..4e2f417 100644 --- a/lang/java/mapred/pom.xml +++ b/lang/java/mapred/pom.xml @@ -159,7 +159,7 @@ <type>test-jar</type> <scope>test</scope> </dependency> - <dependency> + <dependency> <groupId>org.easymock</groupId> <artifactId>easymock</artifactId> <scope>test</scope> @@ -184,7 +184,7 @@ <version>${commons-codec.version}</version> </dependency> </dependencies> - + <profiles> <profile> <id>hadoop1</id> http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java b/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java index 1810208..9927ef9 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java @@ -23,7 +23,7 @@ import java.util.Map; import org.apache.avro.AvroRuntimeException; import org.apache.avro.file.CodecFactory; -/** +/** * Encapsulates the ability to specify and configure an avro compression codec * from a given hadoop codec defined with the configuration parameter: * mapred.output.compression.codec @@ -40,14 +40,14 @@ public class HadoopCodecFactory { private static final Map<String, String> HADOOP_AVRO_NAME_MAP = new HashMap<String, String>(); - + static { HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.DeflateCodec", "deflate"); HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.SnappyCodec", "snappy"); HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.BZip2Codec", "bzip2"); HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.GZipCodec", "deflate"); } - + /** Maps a hadoop codec name into a CodecFactory. * * Currently there are four hadoop codecs registered: @@ -71,7 +71,7 @@ public class HadoopCodecFactory { } return o; } - + public static String getAvroCodecName(String hadoopCodecClass) { return HADOOP_AVRO_NAME_MAP.get(hadoopCodecClass); } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java b/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java index f7a41bf..6b07220 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java @@ -226,7 +226,7 @@ public class SortedKeyValueFile { mDataFileReader = new DataFileReader<GenericRecord> (new FsInput(dataFilePath, options.getConfiguration()), datumReader); - + } /** http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java b/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java index 73ab045..fd0fd8f 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java @@ -96,7 +96,7 @@ public class AvroSequenceFile { options.getFileSystem(), options.getConfigurationWithAvroSerialization(), options.getOutputPath(), options.getKeyClass(), options.getValueClass(), options.getBufferSizeBytes(), options.getReplicationFactor(), - options.getBlockSizeBytes(), + options.getBlockSizeBytes(), options.getCompressionType(), options.getCompressionCodec(), options.getProgressable(), options.getMetadataWithAvroSchemas()); } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java index ca7dab8..d0da121 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java @@ -39,7 +39,7 @@ import org.apache.hadoop.mapred.Reporter; * <p> * This {@link org.apache.hadoop.mapred.InputFormat} is useful for applications * that wish to process Avro data using tools like MapReduce Streaming. - * + * * By default, when pointed at a directory, this will silently skip over any * files in it that do not have .avro extension. To instead include all files, * set the avro.mapred.ignore.inputs.without.extension property to false. @@ -59,7 +59,7 @@ public class AvroAsTextInputFormat extends FileInputFormat<Text, Text> { return super.listStatus(job); } } - + @Override public RecordReader<Text, Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java index 517b472..2ed2a61 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java @@ -54,11 +54,11 @@ class AvroAsTextRecordReader<T> implements RecordReader<Text, Text> { public Text createKey() { return new Text(); } - + public Text createValue() { return new Text(); } - + public boolean next(Text key, Text ignore) throws IOException { if (!reader.hasNext() || reader.pastSync(end)) return false; @@ -80,7 +80,7 @@ class AvroAsTextRecordReader<T> implements RecordReader<Text, Text> { } return true; } - + public float getProgress() throws IOException { if (end == start) { return 0.0f; @@ -88,12 +88,12 @@ class AvroAsTextRecordReader<T> implements RecordReader<Text, Text> { return Math.min(1.0f, (getPos() - start) / (float)(end - start)); } } - + public long getPos() throws IOException { return reader.tell(); } public void close() throws IOException { reader.close(); } - + } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroInputFormat.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroInputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroInputFormat.java index 252339a..5d8bad0 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroInputFormat.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroInputFormat.java @@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.RecordReader; /** * An {@link org.apache.hadoop.mapred.InputFormat} for Avro data files. - * + * * By default, when pointed at a directory, this will silently skip over any * files in it that do not have .avro extension. To instead include all files, * set the avro.mapred.ignore.inputs.without.extension property to false. @@ -44,11 +44,11 @@ public class AvroInputFormat<T> /** Whether to silently ignore input files without the .avro extension */ public static final String IGNORE_FILES_WITHOUT_EXTENSION_KEY = "avro.mapred.ignore.inputs.without.extension"; - + /** Default of whether to silently ignore input files without the .avro * extension. */ public static final boolean IGNORE_INPUTS_WITHOUT_EXTENSION_DEFAULT = true; - + @Override protected FileStatus[] listStatus(JobConf job) throws IOException { if (job.getBoolean(IGNORE_FILES_WITHOUT_EXTENSION_KEY, http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java index 21f130c..4e2a3c9 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java @@ -127,12 +127,12 @@ public class AvroJob { setInputReflect(job); setMapOutputReflect(job); } - + /** Indicate that a job's input data should use reflect representation.*/ public static void setInputReflect(JobConf job) { job.setBoolean(INPUT_IS_REFLECT, true); } - + /** Indicate that a job's map output data should use reflect representation.*/ public static void setMapOutputReflect(JobConf job) { job.setBoolean(MAP_OUTPUT_IS_REFLECT, true); @@ -202,7 +202,7 @@ public class AvroJob { public static void setDataModelClass(JobConf job, Class<? extends GenericData> modelClass) { job.setClass(CONF_DATA_MODEL, modelClass, GenericData.class); } - + /** Return the job's data model implementation class. */ public static Class<? extends GenericData> getDataModelClass(Configuration conf) { return (Class<? extends GenericData>) conf.getClass( http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java index a1b4a1c..5ae03e3 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java @@ -42,10 +42,10 @@ import org.apache.hadoop.io.NullWritable; /** - * The AvroMultipleOutputs class simplifies writing Avro output data + * The AvroMultipleOutputs class simplifies writing Avro output data * to multiple outputs - * - * <p> + * + * <p> * Case one: writing to additional outputs other than the job default output. * * Each additional output, or named output, may be configured with its own @@ -57,16 +57,16 @@ import org.apache.hadoop.io.NullWritable; * <p> * Case two: to write data to different files provided by user * </p> - * + * * <p> - * AvroMultipleOutputs supports counters, by default they are disabled. The - * counters group is the {@link AvroMultipleOutputs} class name. The names of the - * counters are the same as the output name. These count the number of records + * AvroMultipleOutputs supports counters, by default they are disabled. The + * counters group is the {@link AvroMultipleOutputs} class name. The names of the + * counters are the same as the output name. These count the number of records * written to each output name. For multi * named outputs the name of the counter is the concatenation of the named * output, and underscore '_' and the multiname. * </p> - * + * * Usage pattern for job submission: * <pre> * @@ -79,7 +79,7 @@ import org.apache.hadoop.io.NullWritable; * job.setReducerClass(HadoopReducer.class); * job.set("avro.reducer",MyAvroReducer.class); * ... - * + * * Schema schema; * ... * // Defines additional single output 'avro1' for the job @@ -98,7 +98,7 @@ import org.apache.hadoop.io.NullWritable; * <p> * Usage in Reducer: * <pre> - * + * * public class MyAvroReducer extends * AvroReducer<K, V, OUT> { * private MultipleOutputs amos; @@ -140,8 +140,8 @@ public class AvroMultipleOutputs { private static final String MULTI = ".multi"; private static final String COUNTERS_ENABLED = "mo.counters"; - - + + /** * Counters group used by the counters of MultipleOutputs. */ @@ -444,7 +444,7 @@ public class AvroMultipleOutputs { writer.close(reporter); } } - + /** * Output Collector for the default schema. * <p/> @@ -457,7 +457,7 @@ public class AvroMultipleOutputs { public void collect(String namedOutput, Reporter reporter,Object datum) throws IOException{ getCollector(namedOutput,reporter).collect(datum); } - + /** * OutputCollector with custom schema. * <p/> @@ -471,7 +471,7 @@ public class AvroMultipleOutputs { public void collect(String namedOutput, Reporter reporter, Schema schema,Object datum) throws IOException{ getCollector(namedOutput,reporter,schema).collect(datum); } - + /** * OutputCollector with custom schema and file name. * <p/> @@ -486,7 +486,7 @@ public class AvroMultipleOutputs { public void collect(String namedOutput,Reporter reporter,Schema schema,Object datum,String baseOutputPath) throws IOException{ getCollector(namedOutput,null,reporter,baseOutputPath,schema).collect(datum); } - + /** * Gets the output collector for a named output. * <p/> @@ -508,14 +508,14 @@ public class AvroMultipleOutputs { throws IOException{ return getCollector(namedOutput,null,reporter,namedOutput,schema); } - + /** * Gets the output collector for a named output. * <p/> * * @param namedOutput the named output name * @param reporter the reporter - * @param multiName the multiname + * @param multiName the multiname * @return the output collector for the given named output * @throws IOException thrown if output collector could not be created */ @@ -530,8 +530,8 @@ public class AvroMultipleOutputs { throws IOException{ //namedOutputs.add(baseFileName); return getCollector(namedOutput,null,reporter,baseFileName,schema); - } - + } + /** * Gets the output collector for a multi named output. * <p/> @@ -568,18 +568,18 @@ public class AvroMultipleOutputs { getRecordWriter(namedOutput, baseFileName, reporter,schema); return new AvroCollector() { - + @SuppressWarnings({"unchecked"}) public void collect(Object key) throws IOException{ AvroWrapper wrapper = new AvroWrapper(key); writer.write(wrapper, NullWritable.get()); } - + public void collect(Object key,Object value) throws IOException { writer.write(key,value); - } - + } + }; } @@ -597,7 +597,7 @@ public class AvroMultipleOutputs { writer.close(null); } } - + private static class InternalFileOutputFormat extends FileOutputFormat<Object, Object> { public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput"; @@ -620,7 +620,7 @@ public class AvroMultipleOutputs { } OutputFormat outputFormat = outputConf.getOutputFormat(); return outputFormat.getRecordWriter(fs, outputConf, fileName, arg3); - } + } } } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java index 2a681cd..235f768 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java @@ -76,16 +76,16 @@ public class AvroOutputFormat <T> public static void setSyncInterval(JobConf job, int syncIntervalInBytes) { job.setInt(SYNC_INTERVAL_KEY, syncIntervalInBytes); } - + static <T> void configureDataFileWriter(DataFileWriter<T> writer, JobConf job) throws UnsupportedEncodingException { - + CodecFactory factory = getCodecFactory(job); - + if (factory != null) { - writer.setCodec(factory); + writer.setCodec(factory); } - + writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL)); // copy metadata from job @@ -107,16 +107,16 @@ public class AvroOutputFormat <T> * <li>Use avro.output.codec if populated</li> * <li>Next use mapred.output.compression.codec if populated</li> * <li>If not default to Deflate Codec</li> - * </ul> + * </ul> */ static CodecFactory getCodecFactory(JobConf job) { CodecFactory factory = null; - + if (FileOutputFormat.getCompressOutput(job)) { int deflateLevel = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL); int xzLevel = job.getInt(XZ_LEVEL_KEY, DEFAULT_XZ_LEVEL); String codecName = job.get(AvroJob.OUTPUT_CODEC); - + if (codecName == null) { String codecClassName = job.get("mapred.output.compression.codec", null); String avroCodecName = HadoopCodecFactory.getAvroCodecName(codecClassName); @@ -127,7 +127,7 @@ public class AvroOutputFormat <T> } else { return CodecFactory.deflateCodec(deflateLevel); } - } else { + } else { if ( codecName.equals(DEFLATE_CODEC)) { factory = CodecFactory.deflateCodec(deflateLevel); } else if ( codecName.equals(XZ_CODEC)) { @@ -137,7 +137,7 @@ public class AvroOutputFormat <T> } } } - + return factory; } @@ -155,7 +155,7 @@ public class AvroOutputFormat <T> final DataFileWriter<T> writer = new DataFileWriter<T>(dataModel.createDatumWriter(null)); - + configureDataFileWriter(writer, job); Path path = FileOutputFormat.getTaskOutputPath(job, name+EXT); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java index c173d05..351d3c5 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java @@ -56,9 +56,9 @@ public class AvroRecordReader<T> public AvroWrapper<T> createKey() { return new AvroWrapper<T>(null); } - + public NullWritable createValue() { return NullWritable.get(); } - + public boolean next(AvroWrapper<T> wrapper, NullWritable ignore) throws IOException { if (!reader.hasNext() || reader.pastSync(end)) @@ -66,7 +66,7 @@ public class AvroRecordReader<T> wrapper.datum(reader.next(wrapper.datum())); return true; } - + public float getProgress() throws IOException { if (end == start) { return 0.0f; @@ -74,12 +74,12 @@ public class AvroRecordReader<T> return Math.min(1.0f, (getPos() - start) / (float)(end - start)); } } - + public long getPos() throws IOException { return reader.tell(); } public void close() throws IOException { reader.close(); } - + } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java index 92501bf..fa8334b 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java @@ -38,13 +38,13 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; /** The {@link Serialization} used by jobs configured with {@link AvroJob}. */ -public class AvroSerialization<T> extends Configured +public class AvroSerialization<T> extends Configured implements Serialization<AvroWrapper<T>> { public boolean accept(Class<?> c) { return AvroWrapper.class.isAssignableFrom(c); } - + /** Returns the specified map output deserializer. Defaults to the final * output deserializer if no map output schema was specified. */ public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) { @@ -57,7 +57,7 @@ public class AvroSerialization<T> extends Configured DatumReader<T> datumReader = dataModel.createDatumReader(schema); return new AvroWrapperDeserializer(datumReader, isKey); } - + private static final DecoderFactory FACTORY = DecoderFactory.get(); private class AvroWrapperDeserializer @@ -66,16 +66,16 @@ public class AvroSerialization<T> extends Configured private DatumReader<T> reader; private BinaryDecoder decoder; private boolean isKey; - + public AvroWrapperDeserializer(DatumReader<T> reader, boolean isKey) { this.reader = reader; this.isKey = isKey; } - + public void open(InputStream in) { this.decoder = FACTORY.directBinaryDecoder(in, decoder); } - + public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper) throws IOException { T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder); @@ -90,9 +90,9 @@ public class AvroSerialization<T> extends Configured public void close() throws IOException { decoder.inputStream().close(); } - + } - + /** Returns the specified output serializer. */ public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) { // AvroWrapper used for final output, AvroKey or AvroValue for map output @@ -112,7 +112,7 @@ public class AvroSerialization<T> extends Configured private DatumWriter<T> writer; private OutputStream out; private BinaryEncoder encoder; - + public AvroWrapperSerializer(DatumWriter<T> writer) { this.writer = writer; } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java index ef1fae9..917e894 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java @@ -49,7 +49,7 @@ public class AvroTextOutputFormat<K, V> extends FileOutputFormat<K, V> { throws IOException { Schema schema = Schema.create(Schema.Type.BYTES); - + final byte[] keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t").getBytes(UTF8); @@ -63,17 +63,17 @@ public class AvroTextOutputFormat<K, V> extends FileOutputFormat<K, V> { return new AvroTextRecordWriter(writer, keyValueSeparator); } - + class AvroTextRecordWriter implements RecordWriter<K, V> { private final DataFileWriter<ByteBuffer> writer; private final byte[] keyValueSeparator; - + public AvroTextRecordWriter(DataFileWriter<ByteBuffer> writer, byte[] keyValueSeparator) { this.writer = writer; this.keyValueSeparator = keyValueSeparator; } - + public void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; @@ -87,11 +87,11 @@ public class AvroTextOutputFormat<K, V> extends FileOutputFormat<K, V> { writer.append(toByteBuffer(key, keyValueSeparator, value)); } } - + public void close(Reporter reporter) throws IOException { writer.close(); } - + private ByteBuffer toByteBuffer(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; @@ -100,7 +100,7 @@ public class AvroTextOutputFormat<K, V> extends FileOutputFormat<K, V> { return ByteBuffer.wrap(o.toString().getBytes(UTF8)); } } - + private ByteBuffer toByteBuffer(Object key, byte[] sep, Object value) throws IOException { byte[] keyBytes, valBytes; http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroUtf8InputFormat.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroUtf8InputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroUtf8InputFormat.java index ac91109..abd8ae4 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroUtf8InputFormat.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroUtf8InputFormat.java @@ -49,15 +49,15 @@ public class AvroUtf8InputFormat RecordReader<AvroWrapper<Utf8>, NullWritable> { private LineRecordReader lineRecordReader; - + private LongWritable currentKeyHolder = new LongWritable(); private Text currentValueHolder = new Text(); - - public Utf8LineRecordReader(Configuration job, + + public Utf8LineRecordReader(Configuration job, FileSplit split) throws IOException { this.lineRecordReader = new LineRecordReader(job, split); } - + public void close() throws IOException { lineRecordReader.close(); } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroWrapper.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroWrapper.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroWrapper.java index 12c4d9e..71112af 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroWrapper.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroWrapper.java @@ -33,7 +33,7 @@ public class AvroWrapper<T> { /** Set the wrapped datum. */ public void datum(T datum) { this.datum = datum; } - + public int hashCode() { return (datum == null) ? 0 : datum.hashCode(); } @@ -53,7 +53,7 @@ public class AvroWrapper<T> { return false; return true; } - + /** Get the wrapped datum as JSON. */ @Override public String toString() { http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopCombiner.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopCombiner.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopCombiner.java index 5f914fb..2717510 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopCombiner.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopCombiner.java @@ -41,7 +41,7 @@ class HadoopCombiner<K,V> private final AvroKey<K> keyWrapper = new AvroKey<K>(null); private final AvroValue<V> valueWrapper = new AvroValue<V>(null); private OutputCollector<AvroKey<K>,AvroValue<V>> collector; - + public PairCollector(OutputCollector<AvroKey<K>,AvroValue<V>> collector) { this.collector = collector; } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java index 35f11d6..54e1609 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java @@ -33,7 +33,7 @@ import org.apache.hadoop.util.ReflectionUtils; * otherwise assumed to be pairs that are split. */ class HadoopMapper<IN,OUT,K,V,KO,VO> extends MapReduceBase implements Mapper<AvroWrapper<IN>, NullWritable, KO, VO> { - + private AvroMapper<IN,OUT> mapper; private MapCollector<OUT,K,V,KO,VO> out; private boolean isMapOnly; @@ -48,8 +48,8 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> extends MapReduceBase } @Override - public void map(AvroWrapper<IN> wrapper, NullWritable value, - OutputCollector<KO,VO> collector, + public void map(AvroWrapper<IN> wrapper, NullWritable value, + OutputCollector<KO,VO> collector, Reporter reporter) throws IOException { if (this.out == null) this.out = new MapCollector<OUT,K,V,KO,VO>(collector, isMapOnly); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java index 6874969..b806f76 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java @@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.Reducer; abstract class HadoopReducerBase<K,V,OUT,KO,VO> extends MapReduceBase implements Reducer<AvroKey<K>, AvroValue<V>, KO, VO> { - + private AvroReducer<K,V,OUT> reducer; private AvroCollector<OUT> collector; - + protected abstract AvroReducer<K,V,OUT> getReducer(JobConf conf); protected abstract AvroCollector<OUT> getCollector(OutputCollector<KO,VO> c); @@ -52,9 +52,9 @@ abstract class HadoopReducerBase<K,V,OUT,KO,VO> extends MapReduceBase @Override public final void reduce(AvroKey<K> key, Iterator<AvroValue<V>> values, - OutputCollector<KO, VO> out, + OutputCollector<KO, VO> out, Reporter reporter) throws IOException { - if (this.collector == null) + if (this.collector == null) this.collector = getCollector(out); reduceIterable.values = values; reducer.reduce(key.datum(), reduceIterable, collector, reporter); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/Pair.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/Pair.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/Pair.java index 010b08d..a33de99 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/Pair.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/Pair.java @@ -74,7 +74,7 @@ public class Pair<K,V> return pair.getField(VALUE).schema(); } - private static final Map<Schema,Map<Schema,Schema>> SCHEMA_CACHE = + private static final Map<Schema,Map<Schema,Schema>> SCHEMA_CACHE = new WeakHashMap<Schema,Map<Schema,Schema>>(); /** Get a pair schema. */ @@ -144,7 +144,7 @@ public class Pair<K,V> case 0: return key; case 1: return value; default: throw new org.apache.avro.AvroRuntimeException("Bad index: "+i); - } + } } @Override @SuppressWarnings("unchecked") @@ -153,7 +153,7 @@ public class Pair<K,V> case 0: this.key = (K)o; break; case 1: this.value = (V)o; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index: "+i); - } + } } private static final Schema STRING_SCHEMA = Schema.create(Type.STRING); @@ -510,11 +510,11 @@ public class Pair<K,V> // {"Double", "DOUBLE_SCHEMA"}, // {"Void", "NULL_SCHEMA"}, // }; - + // private static String f(String pattern, String value) { // return java.text.MessageFormat.format(pattern, value); // } - + // public static void main(String... args) throws Exception { // StringBuffer b = new StringBuffer(); // for (String[] k : TABLE) { http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/SequenceFileReader.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/SequenceFileReader.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/SequenceFileReader.java index 83c9de1..36c9b61 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/SequenceFileReader.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/SequenceFileReader.java @@ -148,7 +148,7 @@ public class SequenceFileReader<K,V> implements FileReader<Pair<K,V>> { reader.sync(position); ready = false; } - + @Override public boolean pastSync(long position) throws IOException { return reader.getPosition() >= position && reader.syncSeen(); } @@ -179,7 +179,7 @@ public class SequenceFileReader<K,V> implements FileReader<Pair<K,V>> { private static class WritableData extends ReflectData { private static final WritableData INSTANCE = new WritableData(); protected WritableData() {} - + /** Return the singleton instance. */ public static WritableData get() { return INSTANCE; } @@ -194,7 +194,7 @@ public class SequenceFileReader<K,V> implements FileReader<Pair<K,V>> { private interface Converter<T> { T convert(Writable o); } - + private static final Map<Type,Converter> WRITABLE_CONVERTERS = new HashMap<Type,Converter>(); static { http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/SequenceFileRecordReader.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/SequenceFileRecordReader.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/SequenceFileRecordReader.java index 693f34e..bb014a3 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/SequenceFileRecordReader.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/SequenceFileRecordReader.java @@ -31,6 +31,6 @@ public class SequenceFileRecordReader<K,V> extends AvroRecordReader<Pair<K,V>> { super(new SequenceFileReader<K,V>(split.getPath().toUri(), job), split); } - + } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherData.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherData.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherData.java index 6365745..f4620ca 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherData.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherData.java @@ -33,7 +33,7 @@ class TetherData { /** Set the count of records in the buffer. Used for task input only. */ public void count(int count) { this.count = count; } - + /** Return the buffer. */ public ByteBuffer buffer() { return buffer; } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherInputFormat.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherInputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherInputFormat.java index de0ee26..e680324 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherInputFormat.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherInputFormat.java @@ -36,7 +36,7 @@ import org.apache.avro.mapred.AvroOutputFormat; /** * An {@link org.apache.hadoop.mapred.InputFormat} for tethered Avro input. - * + * * By default, when pointed at a directory, this will silently skip over any * files in it that do not have .avro extension. To instead include all files, * set the avro.mapred.ignore.inputs.without.extension property to false. http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java index 169699f..1c16618 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java @@ -42,7 +42,7 @@ public class TetherJob extends Configured { public static final String TETHER_EXEC_ARGS="avro.tether.executable_args"; public static final String TETHER_EXEC_CACHED="avro.tether.executable_cached"; public static final String TETHER_PROTOCOL="avro.tether.protocol"; - + /** Get the URI of the application's executable. */ public static URI getExecutable(JobConf job) { try { @@ -51,15 +51,15 @@ public class TetherJob extends Configured { throw new RuntimeException(e); } } - + /** Set the URI for the application's executable. Normally this in HDFS. */ public static void setExecutable(JobConf job, File executable) { setExecutable(job,executable, new ArrayList<String>(),false); } - + /** - * Set the URI for the application's executable (i.e the program to run in a subprocess - * and provides the mapper/reducer). + * Set the URI for the application's executable (i.e the program to run in a subprocess + * and provides the mapper/reducer). * @param job - Job * @param executable - The URI of the executable * @param args - List of additional arguments; Null if no arguments @@ -114,7 +114,7 @@ public class TetherJob extends Configured { setupTetherJob(conf); return new JobClient(conf).submitJob(conf); } - + /** * Determines which transport protocol (e.g http or sasl) used to communicate * between the parent and subprocess @@ -147,7 +147,7 @@ public class TetherJob extends Configured { // set the map output key class to TetherData job.setMapOutputKeyClass(TetherData.class); - + // if protocol isn't set if (job.getStrings(TETHER_PROTOCOL)==null) { job.set(TETHER_PROTOCOL, "sasl"); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeyComparator.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeyComparator.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeyComparator.java index f1b74b0..3ecdb46 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeyComparator.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeyComparator.java @@ -52,7 +52,7 @@ class TetherKeyComparator @Override public int compare(TetherData x, TetherData y) { ByteBuffer b1 = x.buffer(), b2 = y.buffer(); - int diff = BinaryData.compare(b1.array(), b1.position(), + int diff = BinaryData.compare(b1.array(), b1.position(), b2.array(), b2.position(), schema); return diff == 0 ? -1 : diff; http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java index b91053e..7dbd6fe 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java @@ -39,20 +39,20 @@ class TetherKeySerialization public boolean accept(Class<?> c) { return TetherData.class.isAssignableFrom(c); } - + public Deserializer<TetherData> getDeserializer(Class<TetherData> c) { return new TetherDataDeserializer(); } - + private static final DecoderFactory FACTORY = DecoderFactory.get(); private class TetherDataDeserializer implements Deserializer<TetherData> { private BinaryDecoder decoder; - + public void open(InputStream in) { this.decoder = FACTORY.directBinaryDecoder(in, decoder); } - + public TetherData deserialize(TetherData datum) throws IOException { if (datum == null) datum = new TetherData(); datum.buffer(decoder.readBytes(datum.buffer())); @@ -63,7 +63,7 @@ class TetherKeySerialization decoder.inputStream().close(); } } - + public Serializer<TetherData> getSerializer(Class<TetherData> c) { return new TetherDataSerializer(); } @@ -72,7 +72,7 @@ class TetherKeySerialization private OutputStream out; private BinaryEncoder encoder; - + public void open(OutputStream out) { this.out = out; this.encoder = EncoderFactory.get().directBinaryEncoder(out, encoder); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java index c8b335f..04c7f20 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java @@ -56,10 +56,10 @@ class TetherMapRunner // configure it LOG.info("send configure to subprocess for map task"); process.inputClient.configure - (TaskType.MAP, + (TaskType.MAP, job.get(AvroJob.INPUT_SCHEMA), AvroJob.getMapOutputSchema(job).toString()); - + LOG.info("send partitions to subprocess for map task"); process.inputClient.partitions(job.getNumReduceTasks()); @@ -91,5 +91,5 @@ class TetherMapRunner process.close(); } } - + } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputFormat.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputFormat.java index 8365938..7206947 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputFormat.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputFormat.java @@ -53,7 +53,7 @@ class TetherOutputFormat throws IOException { Schema schema = AvroJob.getOutputSchema(job); - + final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter()); if (FileOutputFormat.getCompressOutput(job)) { http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherPartitioner.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherPartitioner.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherPartitioner.java index ff0c619..eae1722 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherPartitioner.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherPartitioner.java @@ -29,7 +29,7 @@ import org.apache.avro.io.BinaryData; import org.apache.avro.mapred.AvroJob; class TetherPartitioner implements Partitioner<TetherData, NullWritable> { - + private static final ThreadLocal<Integer> CACHE = new ThreadLocal<Integer>(); private Schema schema; http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherRecordReader.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherRecordReader.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherRecordReader.java index 33c06a8..2b8240c 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherRecordReader.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherRecordReader.java @@ -55,9 +55,9 @@ class TetherRecordReader public Schema getSchema() { return reader.getSchema(); } public TetherData createKey() { return new TetherData(); } - + public NullWritable createValue() { return NullWritable.get(); } - + public boolean next(TetherData data, NullWritable ignore) throws IOException { if (!reader.hasNext() || reader.pastSync(end)) @@ -66,7 +66,7 @@ class TetherRecordReader data.count((int)reader.getBlockCount()); return true; } - + public float getProgress() throws IOException { if (end == start) { return 0.0f; @@ -74,11 +74,11 @@ class TetherRecordReader return Math.min(1.0f, (in.tell() - start) / (float)(end - start)); } } - + public long getPos() throws IOException { return in.tell(); } public void close() throws IOException { reader.close(); } - + } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherReducer.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherReducer.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherReducer.java index 35b4231..0647832 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherReducer.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherReducer.java @@ -40,7 +40,7 @@ class TetherReducer this.job = job; } - public void reduce(TetherData datum, Iterator<NullWritable> ignore, + public void reduce(TetherData datum, Iterator<NullWritable> ignore, OutputCollector<TetherData, NullWritable> collector, Reporter reporter) throws IOException { try { http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java index 8ad8e8b..142905a 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java @@ -101,7 +101,7 @@ class TetheredProcess { } outputServer.start(); - + // start sub-process, connecting back to server this.subprocess = startSubprocess(job); @@ -227,5 +227,5 @@ class TetheredProcess { builder.environment().putAll(env); return builder.start(); } - + } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java ---------------------------------------------------------------------- diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java index ce7bc58..e403123 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java @@ -67,7 +67,7 @@ public class AvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritabl */ public AvroKeyRecordWriter(Schema writerSchema, GenericData dataModel, CodecFactory compressionCodec, OutputStream outputStream) throws IOException { - this(writerSchema, dataModel, compressionCodec, outputStream, + this(writerSchema, dataModel, compressionCodec, outputStream, DataFileConstants.DEFAULT_SYNC_INTERVAL); } @@ -82,10 +82,10 @@ public class AvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritabl public void close(TaskAttemptContext context) throws IOException { mAvroFileWriter.close(); } - + /** {@inheritDoc} */ @Override public long sync() throws IOException { return mAvroFileWriter.sync(); - } + } }
