Repository: spark
Updated Branches:
  refs/heads/master 44c1e1ab1 -> 5ae20cf1a


Revert "[SPARK-25408] Move to mode ideomatic Java8"

This reverts commit 44c1e1ab1c26560371831b1593f96f30344c4363.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ae20cf1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ae20cf1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ae20cf1

Branch: refs/heads/master
Commit: 5ae20cf1a96a33f5de4435fcfb55914d64466525
Parents: 44c1e1a
Author: Wenchen Fan <[email protected]>
Authored: Fri Oct 5 11:03:41 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Fri Oct 5 11:03:41 2018 +0800

----------------------------------------------------------------------
 .../spark/util/kvstore/KVStoreSerializer.java   |  10 +-
 .../apache/spark/util/kvstore/LevelDBSuite.java |   2 +-
 .../network/ChunkFetchIntegrationSuite.java     |  53 +++++----
 .../shuffle/ShuffleIndexInformation.java        |   8 +-
 .../ExternalShuffleBlockResolverSuite.java      |  22 ++--
 .../ExternalShuffleIntegrationSuite.java        |  51 +++++----
 .../shuffle/ExternalShuffleSecuritySuite.java   |  15 +--
 .../spark/util/sketch/CountMinSketch.java       |   7 +-
 .../spark/util/sketch/CountMinSketchImpl.java   |   8 +-
 .../apache/spark/io/ReadAheadInputStream.java   | 102 ++++++++---------
 .../sort/BypassMergeSortShuffleWriter.java      |   6 +-
 .../shuffle/sort/ShuffleExternalSorter.java     |  63 ++++++-----
 .../java/org/apache/spark/JavaJdbcRDDSuite.java |  28 +++--
 .../shuffle/sort/UnsafeShuffleWriterSuite.java  |  14 +--
 .../test/org/apache/spark/JavaAPISuite.java     |  27 ++---
 .../expressions/RowBasedKeyValueBatch.java      |   3 +-
 .../expressions/RowBasedKeyValueBatchSuite.java | 110 ++++++++++++-------
 .../org/apache/hive/service/cli/CLIService.java |   9 +-
 .../service/cli/operation/OperationManager.java |   8 +-
 19 files changed, 303 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
index 771a954..bd8d948 100644
--- 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
@@ -54,8 +54,11 @@ public class KVStoreSerializer {
       return ((String) o).getBytes(UTF_8);
     } else {
       ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-      try (GZIPOutputStream out = new GZIPOutputStream(bytes)) {
+      GZIPOutputStream out = new GZIPOutputStream(bytes);
+      try {
         mapper.writeValue(out, o);
+      } finally {
+        out.close();
       }
       return bytes.toByteArray();
     }
@@ -66,8 +69,11 @@ public class KVStoreSerializer {
     if (klass.equals(String.class)) {
       return (T) new String(data, UTF_8);
     } else {
-      try (GZIPInputStream in = new GZIPInputStream(new 
ByteArrayInputStream(data))) {
+      GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data));
+      try {
         return mapper.readValue(in, klass);
+      } finally {
+        in.close();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
index 39a952f..205f7df 100644
--- 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
@@ -217,7 +217,7 @@ public class LevelDBSuite {
   public void testNegativeIndexValues() throws Exception {
     List<Integer> expected = Arrays.asList(-100, -50, 0, 50, 100);
 
-    expected.forEach(i -> {
+    expected.stream().forEach(i -> {
       try {
         db.write(createCustomType1(i));
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index 9656a9a..824482a 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -143,38 +143,37 @@ public class ChunkFetchIntegrationSuite {
   }
 
   private FetchResult fetchChunks(List<Integer> chunkIndices) throws Exception 
{
-    final FetchResult res = new FetchResult();
-
-    try (TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort())) {
-      final Semaphore sem = new Semaphore(0);
+    TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+    final Semaphore sem = new Semaphore(0);
 
-      res.successChunks = Collections.synchronizedSet(new HashSet<Integer>());
-      res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>());
-      res.buffers = Collections.synchronizedList(new 
LinkedList<ManagedBuffer>());
-
-      ChunkReceivedCallback callback = new ChunkReceivedCallback() {
-        @Override
-        public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
-          buffer.retain();
-          res.successChunks.add(chunkIndex);
-          res.buffers.add(buffer);
-          sem.release();
-        }
-
-        @Override
-        public void onFailure(int chunkIndex, Throwable e) {
-          res.failedChunks.add(chunkIndex);
-          sem.release();
-        }
-      };
+    final FetchResult res = new FetchResult();
+    res.successChunks = Collections.synchronizedSet(new HashSet<Integer>());
+    res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>());
+    res.buffers = Collections.synchronizedList(new 
LinkedList<ManagedBuffer>());
 
-      for (int chunkIndex : chunkIndices) {
-        client.fetchChunk(STREAM_ID, chunkIndex, callback);
+    ChunkReceivedCallback callback = new ChunkReceivedCallback() {
+      @Override
+      public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
+        buffer.retain();
+        res.successChunks.add(chunkIndex);
+        res.buffers.add(buffer);
+        sem.release();
       }
-      if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
-        fail("Timeout getting response from the server");
+
+      @Override
+      public void onFailure(int chunkIndex, Throwable e) {
+        res.failedChunks.add(chunkIndex);
+        sem.release();
       }
+    };
+
+    for (int chunkIndex : chunkIndices) {
+      client.fetchChunk(STREAM_ID, chunkIndex, callback);
+    }
+    if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
+      fail("Timeout getting response from the server");
     }
+    client.close();
     return res;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
index 371149b..386738e 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
@@ -37,8 +37,14 @@ public class ShuffleIndexInformation {
     size = (int)indexFile.length();
     ByteBuffer buffer = ByteBuffer.allocate(size);
     offsets = buffer.asLongBuffer();
-    try (DataInputStream dis = new 
DataInputStream(Files.newInputStream(indexFile.toPath()))) {
+    DataInputStream dis = null;
+    try {
+      dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
       dis.readFully(buffer.array());
+    } finally {
+      if (dis != null) {
+        dis.close();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index 44bc25a..d2072a5 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -98,15 +98,19 @@ public class ExternalShuffleBlockResolverSuite {
     resolver.registerExecutor("app0", "exec0",
       dataContext.createExecutorInfo(SORT_MANAGER));
 
-    try (InputStream block0Stream = resolver.getBlockData("app0", "exec0", 0, 
0, 0).createInputStream()) {
-      String block0 = CharStreams.toString(new InputStreamReader(block0Stream, 
StandardCharsets.UTF_8));
-      assertEquals(sortBlock0, block0);
-    }
-
-    try (InputStream block1Stream = resolver.getBlockData("app0", "exec0", 0, 
0, 1).createInputStream()) {
-      String block1 = CharStreams.toString(new InputStreamReader(block1Stream, 
StandardCharsets.UTF_8));
-      assertEquals(sortBlock1, block1);
-    }
+    InputStream block0Stream =
+      resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
+    String block0 = CharStreams.toString(
+        new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
+    block0Stream.close();
+    assertEquals(sortBlock0, block0);
+
+    InputStream block1Stream =
+      resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
+    String block1 = CharStreams.toString(
+        new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
+    block1Stream.close();
+    assertEquals(sortBlock1, block1);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 41bee40..a6a1b8d 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -133,38 +133,37 @@ public class ExternalShuffleIntegrationSuite {
 
     final Semaphore requestsRemaining = new Semaphore(0);
 
-    try (ExternalShuffleClient client = new ExternalShuffleClient(clientConf, 
null, false, 5000)) {
-      client.init(APP_ID);
-      client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
-        new BlockFetchingListener() {
-          @Override
-          public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
-            synchronized (this) {
-              if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
-                data.retain();
-                res.successBlocks.add(blockId);
-                res.buffers.add(data);
-                requestsRemaining.release();
-              }
+    ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, 
false, 5000);
+    client.init(APP_ID);
+    client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
+      new BlockFetchingListener() {
+        @Override
+        public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
+          synchronized (this) {
+            if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
+              data.retain();
+              res.successBlocks.add(blockId);
+              res.buffers.add(data);
+              requestsRemaining.release();
             }
           }
-
-          @Override
-          public void onBlockFetchFailure(String blockId, Throwable exception) 
{
-            synchronized (this) {
-              if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
-                res.failedBlocks.add(blockId);
-                requestsRemaining.release();
-              }
+        }
+
+        @Override
+        public void onBlockFetchFailure(String blockId, Throwable exception) {
+          synchronized (this) {
+            if (!res.successBlocks.contains(blockId) && 
!res.failedBlocks.contains(blockId)) {
+              res.failedBlocks.add(blockId);
+              requestsRemaining.release();
             }
           }
-        }, null
-      );
+        }
+      }, null);
 
-      if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) 
{
-        fail("Timeout getting response from the server");
-      }
+    if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
+      fail("Timeout getting response from the server");
     }
+    client.close();
     return res;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index dafefaa..16bad9f 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -96,13 +96,14 @@ public class ExternalShuffleSecuritySuite {
         ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true")));
     }
 
-    try (ExternalShuffleClient client =
-        new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, 
secretKey), true, 5000)) {
-      client.init(appId);
-      // Registration either succeeds or throws an exception.
-      client.registerWithShuffleServer(TestUtils.getLocalHost(), 
server.getPort(), "exec0",
-        new ExecutorShuffleInfo(new String[0], 0, 
"org.apache.spark.shuffle.sort.SortShuffleManager"));
-    }
+    ExternalShuffleClient client =
+      new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, 
secretKey), true, 5000);
+    client.init(appId);
+    // Registration either succeeds or throws an exception.
+    client.registerWithShuffleServer(TestUtils.getLocalHost(), 
server.getPort(), "exec0",
+      new ExecutorShuffleInfo(new String[0], 0,
+        "org.apache.spark.shuffle.sort.SortShuffleManager"));
+    client.close();
   }
 
   /** Provides a secret key holder which always returns the given secret key, 
for a single appId. */

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java
----------------------------------------------------------------------
diff --git 
a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java 
b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java
index 06a248c..f7c22dd 100644
--- 
a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java
+++ 
b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java
@@ -191,9 +191,10 @@ public abstract class CountMinSketch {
    * Reads in a {@link CountMinSketch} from a byte array.
    */
   public static CountMinSketch readFrom(byte[] bytes) throws IOException {
-    try (InputStream in = new ByteArrayInputStream(bytes)) {
-      return readFrom(in);
-    }
+    InputStream in = new ByteArrayInputStream(bytes);
+    CountMinSketch cms = readFrom(in);
+    in.close();
+    return cms;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
----------------------------------------------------------------------
diff --git 
a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
 
b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
index b78c167..fd1906d 100644
--- 
a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
+++ 
b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
@@ -322,10 +322,10 @@ class CountMinSketchImpl extends CountMinSketch 
implements Serializable {
 
   @Override
   public byte[] toByteArray() throws IOException {
-    try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-      writeTo(out);
-      return out.toByteArray();
-    }
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    writeTo(out);
+    out.close();
+    return out.toByteArray();
   }
 
   public static CountMinSketchImpl readFrom(InputStream in) throws IOException 
{

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
index 2e18715..0cced9e 100644
--- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
+++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
@@ -135,58 +135,62 @@ public class ReadAheadInputStream extends InputStream {
     } finally {
       stateChangeLock.unlock();
     }
-    executorService.execute(() -> {
-      stateChangeLock.lock();
-      try {
-        if (isClosed) {
-          readInProgress = false;
-          return;
-        }
-        // Flip this so that the close method will not close the underlying 
input stream when we
-        // are reading.
-        isReading = true;
-      } finally {
-        stateChangeLock.unlock();
-      }
+    executorService.execute(new Runnable() {
 
-      // Please note that it is safe to release the lock and read into the 
read ahead buffer
-      // because either of following two conditions will hold - 1. The active 
buffer has
-      // data available to read so the reader will not read from the read 
ahead buffer.
-      // 2. This is the first time read is called or the active buffer is 
exhausted,
-      // in that case the reader waits for this async read to complete.
-      // So there is no race condition in both the situations.
-      int read = 0;
-      int off = 0, len = arr.length;
-      Throwable exception = null;
-      try {
-        // try to fill the read ahead buffer.
-        // if a reader is waiting, possibly return early.
-        do {
-          read = underlyingInputStream.read(arr, off, len);
-          if (read <= 0) break;
-          off += read;
-          len -= read;
-        } while (len > 0 && !isWaiting.get());
-      } catch (Throwable ex) {
-        exception = ex;
-        if (ex instanceof Error) {
-          // `readException` may not be reported to the user. Rethrow Error to 
make sure at least
-          // The user can see Error in UncaughtExceptionHandler.
-          throw (Error) ex;
-        }
-      } finally {
+      @Override
+      public void run() {
         stateChangeLock.lock();
-        readAheadBuffer.limit(off);
-        if (read < 0 || (exception instanceof EOFException)) {
-          endOfStream = true;
-        } else if (exception != null) {
-          readAborted = true;
-          readException = exception;
+        try {
+          if (isClosed) {
+            readInProgress = false;
+            return;
+          }
+          // Flip this so that the close method will not close the underlying 
input stream when we
+          // are reading.
+          isReading = true;
+        } finally {
+          stateChangeLock.unlock();
+        }
+
+        // Please note that it is safe to release the lock and read into the 
read ahead buffer
+        // because either of following two conditions will hold - 1. The 
active buffer has
+        // data available to read so the reader will not read from the read 
ahead buffer.
+        // 2. This is the first time read is called or the active buffer is 
exhausted,
+        // in that case the reader waits for this async read to complete.
+        // So there is no race condition in both the situations.
+        int read = 0;
+        int off = 0, len = arr.length;
+        Throwable exception = null;
+        try {
+          // try to fill the read ahead buffer.
+          // if a reader is waiting, possibly return early.
+          do {
+            read = underlyingInputStream.read(arr, off, len);
+            if (read <= 0) break;
+            off += read;
+            len -= read;
+          } while (len > 0 && !isWaiting.get());
+        } catch (Throwable ex) {
+          exception = ex;
+          if (ex instanceof Error) {
+            // `readException` may not be reported to the user. Rethrow Error 
to make sure at least
+            // The user can see Error in UncaughtExceptionHandler.
+            throw (Error) ex;
+          }
+        } finally {
+          stateChangeLock.lock();
+          readAheadBuffer.limit(off);
+          if (read < 0 || (exception instanceof EOFException)) {
+            endOfStream = true;
+          } else if (exception != null) {
+            readAborted = true;
+            readException = exception;
+          }
+          readInProgress = false;
+          signalAsyncReadComplete();
+          stateChangeLock.unlock();
+          closeUnderlyingInputStreamIfNecessary();
         }
-        readInProgress = false;
-        signalAsyncReadComplete();
-        stateChangeLock.unlock();
-        closeUnderlyingInputStreamIfNecessary();
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index abe027f..323a5d3 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -152,9 +152,9 @@ final class BypassMergeSortShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     }
 
     for (int i = 0; i < numPartitions; i++) {
-      try (final DiskBlockObjectWriter writer = partitionWriters[i]) {
-        partitionWriterSegments[i] = writer.commitAndGet();
-      }
+      final DiskBlockObjectWriter writer = partitionWriters[i];
+      partitionWriterSegments[i] = writer.commitAndGet();
+      writer.close();
     }
 
     File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index ad66074..c7d2db4 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -181,43 +181,42 @@ final class ShuffleExternalSorter extends MemoryConsumer {
     // around this, we pass a dummy no-op serializer.
     final SerializerInstance ser = DummySerializerInstance.INSTANCE;
 
-    int currentPartition = -1;
-    final FileSegment committedSegment;
-    try (final DiskBlockObjectWriter writer =
-        blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, 
writeMetricsToUse)) {
-
-      final int uaoSize = UnsafeAlignedOffset.getUaoSize();
-      while (sortedRecords.hasNext()) {
-        sortedRecords.loadNext();
-        final int partition = 
sortedRecords.packedRecordPointer.getPartitionId();
-        assert (partition >= currentPartition);
-        if (partition != currentPartition) {
-          // Switch to the new partition
-          if (currentPartition != -1) {
-            final FileSegment fileSegment = writer.commitAndGet();
-            spillInfo.partitionLengths[currentPartition] = 
fileSegment.length();
-          }
-          currentPartition = partition;
-        }
+    final DiskBlockObjectWriter writer =
+      blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, 
writeMetricsToUse);
 
-        final long recordPointer = 
sortedRecords.packedRecordPointer.getRecordPointer();
-        final Object recordPage = taskMemoryManager.getPage(recordPointer);
-        final long recordOffsetInPage = 
taskMemoryManager.getOffsetInPage(recordPointer);
-        int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, 
recordOffsetInPage);
-        long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
-        while (dataRemaining > 0) {
-          final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
-          Platform.copyMemory(
-            recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-          writer.write(writeBuffer, 0, toTransfer);
-          recordReadPosition += toTransfer;
-          dataRemaining -= toTransfer;
+    int currentPartition = -1;
+    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
+    while (sortedRecords.hasNext()) {
+      sortedRecords.loadNext();
+      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
+      assert (partition >= currentPartition);
+      if (partition != currentPartition) {
+        // Switch to the new partition
+        if (currentPartition != -1) {
+          final FileSegment fileSegment = writer.commitAndGet();
+          spillInfo.partitionLengths[currentPartition] = fileSegment.length();
         }
-        writer.recordWritten();
+        currentPartition = partition;
       }
 
-      committedSegment = writer.commitAndGet();
+      final long recordPointer = 
sortedRecords.packedRecordPointer.getRecordPointer();
+      final Object recordPage = taskMemoryManager.getPage(recordPointer);
+      final long recordOffsetInPage = 
taskMemoryManager.getOffsetInPage(recordPointer);
+      int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, 
recordOffsetInPage);
+      long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
+      while (dataRemaining > 0) {
+        final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
+        Platform.copyMemory(
+          recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
+        writer.write(writeBuffer, 0, toTransfer);
+        recordReadPosition += toTransfer;
+        dataRemaining -= toTransfer;
+      }
+      writer.recordWritten();
     }
+
+    final FileSegment committedSegment = writer.commitAndGet();
+    writer.close();
     // If `writeSortedFile()` was called from `closeAndGetSpills()` and no 
records were inserted,
     // then the file might be empty. Note that it might be better to avoid 
calling
     // writeSortedFile() in that case.

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java 
b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
index c35661e..a6589d2 100644
--- a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
+++ b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
@@ -39,26 +39,30 @@ public class JavaJdbcRDDSuite implements Serializable {
     sc = new JavaSparkContext("local", "JavaAPISuite");
 
     Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+    Connection connection =
+      
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true");
 
-    try (Connection connection = 
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true"))
 {
-
-      try (Statement create = connection.createStatement()) {
-        create.execute(
-          "CREATE TABLE FOO(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY 
(START WITH 1, INCREMENT BY 1)," +
-            "DATA INTEGER)");
-      }
+    try {
+      Statement create = connection.createStatement();
+      create.execute(
+        "CREATE TABLE FOO(" +
+        "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, 
INCREMENT BY 1)," +
+        "DATA INTEGER)");
+      create.close();
 
-      try (PreparedStatement insert = connection.prepareStatement("INSERT INTO 
FOO(DATA) VALUES(?)")) {
-        for (int i = 1; i <= 100; i++) {
-          insert.setInt(1, i * 2);
-          insert.executeUpdate();
-        }
+      PreparedStatement insert = connection.prepareStatement("INSERT INTO 
FOO(DATA) VALUES(?)");
+      for (int i = 1; i <= 100; i++) {
+        insert.setInt(1, i * 2);
+        insert.executeUpdate();
       }
+      insert.close();
     } catch (SQLException e) {
       // If table doesn't exist...
       if (e.getSQLState().compareTo("X0Y32") != 0) {
         throw e;
       }
+    } finally {
+      connection.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index a07d0e8..0d5c5ea 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -186,14 +186,14 @@ public class UnsafeShuffleWriterSuite {
         if (conf.getBoolean("spark.shuffle.compress", true)) {
           in = 
CompressionCodec$.MODULE$.createCodec(conf).compressedInputStream(in);
         }
-        try (DeserializationStream recordsStream = 
serializer.newInstance().deserializeStream(in)) {
-          Iterator<Tuple2<Object, Object>> records = 
recordsStream.asKeyValueIterator();
-          while (records.hasNext()) {
-            Tuple2<Object, Object> record = records.next();
-            assertEquals(i, hashPartitioner.getPartition(record._1()));
-            recordsList.add(record);
-          }
+        DeserializationStream recordsStream = 
serializer.newInstance().deserializeStream(in);
+        Iterator<Tuple2<Object, Object>> records = 
recordsStream.asKeyValueIterator();
+        while (records.hasNext()) {
+          Tuple2<Object, Object> record = records.next();
+          assertEquals(i, hashPartitioner.getPartition(record._1()));
+          recordsList.add(record);
         }
+        recordsStream.close();
         startOffset += partitionSize;
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java 
b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
index 3992ab7..01b5fb7 100644
--- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -997,10 +997,10 @@ public class JavaAPISuite implements Serializable {
 
     FileOutputStream fos1 = new FileOutputStream(file1);
 
-    try (FileChannel channel1 = fos1.getChannel()) {
-      ByteBuffer bbuf = ByteBuffer.wrap(content1);
-      channel1.write(bbuf);
-    }
+    FileChannel channel1 = fos1.getChannel();
+    ByteBuffer bbuf = ByteBuffer.wrap(content1);
+    channel1.write(bbuf);
+    channel1.close();
     JavaPairRDD<String, PortableDataStream> readRDD = 
sc.binaryFiles(tempDirName, 3);
     List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
     for (Tuple2<String, PortableDataStream> res : result) {
@@ -1018,10 +1018,10 @@ public class JavaAPISuite implements Serializable {
 
     FileOutputStream fos1 = new FileOutputStream(file1);
 
-    try (FileChannel channel1 = fos1.getChannel()) {
-      ByteBuffer bbuf = ByteBuffer.wrap(content1);
-      channel1.write(bbuf);
-    }
+    FileChannel channel1 = fos1.getChannel();
+    ByteBuffer bbuf = ByteBuffer.wrap(content1);
+    channel1.write(bbuf);
+    channel1.close();
 
     JavaPairRDD<String, PortableDataStream> readRDD = 
sc.binaryFiles(tempDirName).cache();
     readRDD.foreach(pair -> pair._2().toArray()); // force the file to read
@@ -1042,12 +1042,13 @@ public class JavaAPISuite implements Serializable {
 
     FileOutputStream fos1 = new FileOutputStream(file1);
 
-    try (FileChannel channel1 = fos1.getChannel()) {
-      for (int i = 0; i < numOfCopies; i++) {
-        ByteBuffer bbuf = ByteBuffer.wrap(content1);
-        channel1.write(bbuf);
-      }
+    FileChannel channel1 = fos1.getChannel();
+
+    for (int i = 0; i < numOfCopies; i++) {
+      ByteBuffer bbuf = ByteBuffer.wrap(content1);
+      channel1.write(bbuf);
     }
+    channel1.close();
 
     JavaRDD<byte[]> readRDD = sc.binaryRecords(tempDirName, content1.length);
     assertEquals(numOfCopies,readRDD.count());

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
index 4605138..551443a 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.sql.catalyst.expressions;
 
-import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.spark.memory.MemoryConsumer;
@@ -46,7 +45,7 @@ import org.slf4j.LoggerFactory;
  * page requires an average size for key value pairs to be larger than 1024 
bytes.
  *
  */
-public abstract class RowBasedKeyValueBatch extends MemoryConsumer implements 
Closeable {
+public abstract class RowBasedKeyValueBatch extends MemoryConsumer {
   protected final Logger logger = 
LoggerFactory.getLogger(RowBasedKeyValueBatch.class);
 
   private static final int DEFAULT_CAPACITY = 1 << 16;

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
index ef02f0a..2da8711 100644
--- 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
+++ 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
@@ -123,8 +123,9 @@ public class RowBasedKeyValueBatchSuite {
 
   @Test
   public void emptyBatch() throws Exception {
-    try (RowBasedKeyValueBatch batch = 
RowBasedKeyValueBatch.allocate(keySchema,
-            valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) {
+    RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema,
+            valueSchema, taskMemoryManager, DEFAULT_CAPACITY);
+    try {
       Assert.assertEquals(0, batch.numRows());
       try {
         batch.getKeyRow(-1);
@@ -151,24 +152,31 @@ public class RowBasedKeyValueBatchSuite {
         // Expected exception; do nothing.
       }
       Assert.assertFalse(batch.rowIterator().next());
+    } finally {
+      batch.close();
     }
   }
 
   @Test
-  public void batchType() {
-    try (RowBasedKeyValueBatch batch1 = 
RowBasedKeyValueBatch.allocate(keySchema,
+  public void batchType() throws Exception {
+    RowBasedKeyValueBatch batch1 = RowBasedKeyValueBatch.allocate(keySchema,
+            valueSchema, taskMemoryManager, DEFAULT_CAPACITY);
+    RowBasedKeyValueBatch batch2 = 
RowBasedKeyValueBatch.allocate(fixedKeySchema,
             valueSchema, taskMemoryManager, DEFAULT_CAPACITY);
-         RowBasedKeyValueBatch batch2 = 
RowBasedKeyValueBatch.allocate(fixedKeySchema,
-            valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) {
+    try {
       Assert.assertEquals(batch1.getClass(), 
VariableLengthRowBasedKeyValueBatch.class);
       Assert.assertEquals(batch2.getClass(), 
FixedLengthRowBasedKeyValueBatch.class);
+    } finally {
+      batch1.close();
+      batch2.close();
     }
   }
 
   @Test
   public void setAndRetrieve() {
-    try (RowBasedKeyValueBatch batch = 
RowBasedKeyValueBatch.allocate(keySchema,
-            valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) {
+    RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema,
+            valueSchema, taskMemoryManager, DEFAULT_CAPACITY);
+    try {
       UnsafeRow ret1 = appendRow(batch, makeKeyRow(1, "A"), makeValueRow(1, 
1));
       Assert.assertTrue(checkValue(ret1, 1, 1));
       UnsafeRow ret2 = appendRow(batch, makeKeyRow(2, "B"), makeValueRow(2, 
2));
@@ -196,27 +204,33 @@ public class RowBasedKeyValueBatchSuite {
       } catch (AssertionError e) {
         // Expected exception; do nothing.
       }
+    } finally {
+      batch.close();
     }
   }
 
   @Test
   public void setUpdateAndRetrieve() {
-    try (RowBasedKeyValueBatch batch = 
RowBasedKeyValueBatch.allocate(keySchema,
-            valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) {
+    RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema,
+            valueSchema, taskMemoryManager, DEFAULT_CAPACITY);
+    try {
       appendRow(batch, makeKeyRow(1, "A"), makeValueRow(1, 1));
       Assert.assertEquals(1, batch.numRows());
       UnsafeRow retrievedValue = batch.getValueRow(0);
       updateValueRow(retrievedValue, 2, 2);
       UnsafeRow retrievedValue2 = batch.getValueRow(0);
       Assert.assertTrue(checkValue(retrievedValue2, 2, 2));
+    } finally {
+      batch.close();
     }
   }
 
 
   @Test
   public void iteratorTest() throws Exception {
-    try (RowBasedKeyValueBatch batch = 
RowBasedKeyValueBatch.allocate(keySchema,
-            valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) {
+    RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema,
+            valueSchema, taskMemoryManager, DEFAULT_CAPACITY);
+    try {
       appendRow(batch, makeKeyRow(1, "A"), makeValueRow(1, 1));
       appendRow(batch, makeKeyRow(2, "B"), makeValueRow(2, 2));
       appendRow(batch, makeKeyRow(3, "C"), makeValueRow(3, 3));
@@ -239,13 +253,16 @@ public class RowBasedKeyValueBatchSuite {
       Assert.assertTrue(checkKey(key3, 3, "C"));
       Assert.assertTrue(checkValue(value3, 3, 3));
       Assert.assertFalse(iterator.next());
+    } finally {
+      batch.close();
     }
   }
 
   @Test
   public void fixedLengthTest() throws Exception {
-    try (RowBasedKeyValueBatch batch = 
RowBasedKeyValueBatch.allocate(fixedKeySchema,
-            valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) {
+    RowBasedKeyValueBatch batch = 
RowBasedKeyValueBatch.allocate(fixedKeySchema,
+            valueSchema, taskMemoryManager, DEFAULT_CAPACITY);
+    try {
       appendRow(batch, makeKeyRow(11, 11), makeValueRow(1, 1));
       appendRow(batch, makeKeyRow(22, 22), makeValueRow(2, 2));
       appendRow(batch, makeKeyRow(33, 33), makeValueRow(3, 3));
@@ -276,13 +293,16 @@ public class RowBasedKeyValueBatchSuite {
       Assert.assertTrue(checkKey(key3, 33, 33));
       Assert.assertTrue(checkValue(value3, 3, 3));
       Assert.assertFalse(iterator.next());
+    } finally {
+      batch.close();
     }
   }
 
   @Test
   public void appendRowUntilExceedingCapacity() throws Exception {
-    try (RowBasedKeyValueBatch batch = 
RowBasedKeyValueBatch.allocate(keySchema,
-            valueSchema, taskMemoryManager, 10)) {
+    RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema,
+            valueSchema, taskMemoryManager, 10);
+    try {
       UnsafeRow key = makeKeyRow(1, "A");
       UnsafeRow value = makeValueRow(1, 1);
       for (int i = 0; i < 10; i++) {
@@ -301,6 +321,8 @@ public class RowBasedKeyValueBatchSuite {
         Assert.assertTrue(checkValue(value1, 1, 1));
       }
       Assert.assertFalse(iterator.next());
+    } finally {
+      batch.close();
     }
   }
 
@@ -308,8 +330,9 @@ public class RowBasedKeyValueBatchSuite {
   public void appendRowUntilExceedingPageSize() throws Exception {
     // Use default size or spark.buffer.pageSize if specified
     int pageSizeToUse = (int) memoryManager.pageSizeBytes();
-    try (RowBasedKeyValueBatch batch = 
RowBasedKeyValueBatch.allocate(keySchema,
-            valueSchema, taskMemoryManager, pageSizeToUse)) {
+    RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema,
+            valueSchema, taskMemoryManager, pageSizeToUse); //enough capacity
+    try {
       UnsafeRow key = makeKeyRow(1, "A");
       UnsafeRow value = makeValueRow(1, 1);
       int recordLength = 8 + key.getSizeInBytes() + value.getSizeInBytes() + 8;
@@ -333,44 +356,49 @@ public class RowBasedKeyValueBatchSuite {
         Assert.assertTrue(checkValue(value1, 1, 1));
       }
       Assert.assertFalse(iterator.next());
+    } finally {
+      batch.close();
     }
   }
 
   @Test
   public void failureToAllocateFirstPage() throws Exception {
     memoryManager.limit(1024);
-    try (RowBasedKeyValueBatch batch = 
RowBasedKeyValueBatch.allocate(keySchema,
-            valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) {
+    RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema,
+            valueSchema, taskMemoryManager, DEFAULT_CAPACITY);
+    try {
       UnsafeRow key = makeKeyRow(1, "A");
       UnsafeRow value = makeValueRow(11, 11);
       UnsafeRow ret = appendRow(batch, key, value);
       Assert.assertNull(ret);
       Assert.assertFalse(batch.rowIterator().next());
+    } finally {
+      batch.close();
     }
   }
 
   @Test
   public void randomizedTest() {
-    try (RowBasedKeyValueBatch batch = 
RowBasedKeyValueBatch.allocate(keySchema,
-            valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) {
-      int numEntry = 100;
-      long[] expectedK1 = new long[numEntry];
-      String[] expectedK2 = new String[numEntry];
-      long[] expectedV1 = new long[numEntry];
-      long[] expectedV2 = new long[numEntry];
-
-      for (int i = 0; i < numEntry; i++) {
-        long k1 = rand.nextLong();
-        String k2 = getRandomString(rand.nextInt(256));
-        long v1 = rand.nextLong();
-        long v2 = rand.nextLong();
-        appendRow(batch, makeKeyRow(k1, k2), makeValueRow(v1, v2));
-        expectedK1[i] = k1;
-        expectedK2[i] = k2;
-        expectedV1[i] = v1;
-        expectedV2[i] = v2;
-      }
-
+    RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema,
+            valueSchema, taskMemoryManager, DEFAULT_CAPACITY);
+    int numEntry = 100;
+    long[] expectedK1 = new long[numEntry];
+    String[] expectedK2 = new String[numEntry];
+    long[] expectedV1 = new long[numEntry];
+    long[] expectedV2 = new long[numEntry];
+
+    for (int i = 0; i < numEntry; i++) {
+      long k1 = rand.nextLong();
+      String k2 = getRandomString(rand.nextInt(256));
+      long v1 = rand.nextLong();
+      long v2 = rand.nextLong();
+      appendRow(batch, makeKeyRow(k1, k2), makeValueRow(v1, v2));
+      expectedK1[i] = k1;
+      expectedK2[i] = k2;
+      expectedV1[i] = v1;
+      expectedV2[i] = v2;
+    }
+    try {
       for (int j = 0; j < 10000; j++) {
         int rowId = rand.nextInt(numEntry);
         if (rand.nextBoolean()) {
@@ -382,6 +410,8 @@ public class RowBasedKeyValueBatchSuite {
           Assert.assertTrue(checkValue(value, expectedV1[rowId], 
expectedV2[rowId]));
         }
       }
+    } finally {
+      batch.close();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
index 3cbc2c4..791ddcb 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
@@ -146,11 +146,18 @@ public class CLIService extends CompositeService 
implements ICLIService {
   public synchronized void start() {
     super.start();
     // Initialize and test a connection to the metastore
-    try (IMetaStoreClient metastoreClient = new HiveMetaStoreClient(hiveConf)) 
{
+    IMetaStoreClient metastoreClient = null;
+    try {
+      metastoreClient = new HiveMetaStoreClient(hiveConf);
       metastoreClient.getDatabases("default");
     } catch (Exception e) {
       throw new ServiceException("Unable to connect to MetaStore!", e);
     }
+    finally {
+      if (metastoreClient != null) {
+        metastoreClient.close();
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java
index 4a8779e..92c340a 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -197,11 +197,11 @@ public class OperationManager extends AbstractService {
   }
 
   public void closeOperation(OperationHandle opHandle) throws HiveSQLException 
{
-    try (Operation operation = removeOperation(opHandle)) {
-      if (operation == null) {
-        throw new HiveSQLException("Operation does not exist!");
-      }
+    Operation operation = removeOperation(opHandle);
+    if (operation == null) {
+      throw new HiveSQLException("Operation does not exist!");
     }
+    operation.close();
   }
 
   public TableSchema getOperationResultSetSchema(OperationHandle opHandle)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to