Repository: spark Updated Branches: refs/heads/master 44c1e1ab1 -> 5ae20cf1a
Revert "[SPARK-25408] Move to mode ideomatic Java8" This reverts commit 44c1e1ab1c26560371831b1593f96f30344c4363. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ae20cf1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ae20cf1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ae20cf1 Branch: refs/heads/master Commit: 5ae20cf1a96a33f5de4435fcfb55914d64466525 Parents: 44c1e1a Author: Wenchen Fan <[email protected]> Authored: Fri Oct 5 11:03:41 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Fri Oct 5 11:03:41 2018 +0800 ---------------------------------------------------------------------- .../spark/util/kvstore/KVStoreSerializer.java | 10 +- .../apache/spark/util/kvstore/LevelDBSuite.java | 2 +- .../network/ChunkFetchIntegrationSuite.java | 53 +++++---- .../shuffle/ShuffleIndexInformation.java | 8 +- .../ExternalShuffleBlockResolverSuite.java | 22 ++-- .../ExternalShuffleIntegrationSuite.java | 51 +++++---- .../shuffle/ExternalShuffleSecuritySuite.java | 15 +-- .../spark/util/sketch/CountMinSketch.java | 7 +- .../spark/util/sketch/CountMinSketchImpl.java | 8 +- .../apache/spark/io/ReadAheadInputStream.java | 102 ++++++++--------- .../sort/BypassMergeSortShuffleWriter.java | 6 +- .../shuffle/sort/ShuffleExternalSorter.java | 63 ++++++----- .../java/org/apache/spark/JavaJdbcRDDSuite.java | 28 +++-- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 14 +-- .../test/org/apache/spark/JavaAPISuite.java | 27 ++--- .../expressions/RowBasedKeyValueBatch.java | 3 +- .../expressions/RowBasedKeyValueBatchSuite.java | 110 ++++++++++++------- .../org/apache/hive/service/cli/CLIService.java | 9 +- .../service/cli/operation/OperationManager.java | 8 +- 19 files changed, 303 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java index 771a954..bd8d948 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java @@ -54,8 +54,11 @@ public class KVStoreSerializer { return ((String) o).getBytes(UTF_8); } else { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - try (GZIPOutputStream out = new GZIPOutputStream(bytes)) { + GZIPOutputStream out = new GZIPOutputStream(bytes); + try { mapper.writeValue(out, o); + } finally { + out.close(); } return bytes.toByteArray(); } @@ -66,8 +69,11 @@ public class KVStoreSerializer { if (klass.equals(String.class)) { return (T) new String(data, UTF_8); } else { - try (GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data))) { + GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data)); + try { return mapper.readValue(in, klass); + } finally { + in.close(); } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java ---------------------------------------------------------------------- diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java index 39a952f..205f7df 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java @@ -217,7 +217,7 @@ public class LevelDBSuite { public void testNegativeIndexValues() throws Exception { List<Integer> expected = Arrays.asList(-100, -50, 0, 50, 100); - expected.forEach(i -> { + expected.stream().forEach(i -> { try { db.write(createCustomType1(i)); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java index 9656a9a..824482a 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java @@ -143,38 +143,37 @@ public class ChunkFetchIntegrationSuite { } private FetchResult fetchChunks(List<Integer> chunkIndices) throws Exception { - final FetchResult res = new FetchResult(); - - try (TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort())) { - final Semaphore sem = new Semaphore(0); + TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); + final Semaphore sem = new Semaphore(0); - res.successChunks = Collections.synchronizedSet(new HashSet<Integer>()); - res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>()); - res.buffers = Collections.synchronizedList(new LinkedList<ManagedBuffer>()); - - ChunkReceivedCallback callback = new ChunkReceivedCallback() { - @Override - public void onSuccess(int chunkIndex, ManagedBuffer buffer) { - buffer.retain(); - res.successChunks.add(chunkIndex); - res.buffers.add(buffer); - sem.release(); - } - - @Override - public void onFailure(int chunkIndex, Throwable e) { - res.failedChunks.add(chunkIndex); - sem.release(); - } - }; + final FetchResult res = new FetchResult(); + res.successChunks = Collections.synchronizedSet(new HashSet<Integer>()); + res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>()); + res.buffers = Collections.synchronizedList(new LinkedList<ManagedBuffer>()); - for (int chunkIndex : chunkIndices) { - client.fetchChunk(STREAM_ID, chunkIndex, callback); + ChunkReceivedCallback callback = new ChunkReceivedCallback() { + @Override + public void onSuccess(int chunkIndex, ManagedBuffer buffer) { + buffer.retain(); + res.successChunks.add(chunkIndex); + res.buffers.add(buffer); + sem.release(); } - if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) { - fail("Timeout getting response from the server"); + + @Override + public void onFailure(int chunkIndex, Throwable e) { + res.failedChunks.add(chunkIndex); + sem.release(); } + }; + + for (int chunkIndex : chunkIndices) { + client.fetchChunk(STREAM_ID, chunkIndex, callback); + } + if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) { + fail("Timeout getting response from the server"); } + client.close(); return res; } http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java index 371149b..386738e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java @@ -37,8 +37,14 @@ public class ShuffleIndexInformation { size = (int)indexFile.length(); ByteBuffer buffer = ByteBuffer.allocate(size); offsets = buffer.asLongBuffer(); - try (DataInputStream dis = new DataInputStream(Files.newInputStream(indexFile.toPath()))) { + DataInputStream dis = null; + try { + dis = new DataInputStream(Files.newInputStream(indexFile.toPath())); dis.readFully(buffer.array()); + } finally { + if (dis != null) { + dis.close(); + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 44bc25a..d2072a5 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -98,15 +98,19 @@ public class ExternalShuffleBlockResolverSuite { resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); - try (InputStream block0Stream = resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream()) { - String block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); - assertEquals(sortBlock0, block0); - } - - try (InputStream block1Stream = resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream()) { - String block1 = CharStreams.toString(new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); - assertEquals(sortBlock1, block1); - } + InputStream block0Stream = + resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream(); + String block0 = CharStreams.toString( + new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); + block0Stream.close(); + assertEquals(sortBlock0, block0); + + InputStream block1Stream = + resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream(); + String block1 = CharStreams.toString( + new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); + block1Stream.close(); + assertEquals(sortBlock1, block1); } @Test http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 41bee40..a6a1b8d 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -133,38 +133,37 @@ public class ExternalShuffleIntegrationSuite { final Semaphore requestsRemaining = new Semaphore(0); - try (ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) { - client.init(APP_ID); - client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, - new BlockFetchingListener() { - @Override - public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { - synchronized (this) { - if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - data.retain(); - res.successBlocks.add(blockId); - res.buffers.add(data); - requestsRemaining.release(); - } + ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000); + client.init(APP_ID); + client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, + new BlockFetchingListener() { + @Override + public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { + synchronized (this) { + if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { + data.retain(); + res.successBlocks.add(blockId); + res.buffers.add(data); + requestsRemaining.release(); } } - - @Override - public void onBlockFetchFailure(String blockId, Throwable exception) { - synchronized (this) { - if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - res.failedBlocks.add(blockId); - requestsRemaining.release(); - } + } + + @Override + public void onBlockFetchFailure(String blockId, Throwable exception) { + synchronized (this) { + if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { + res.failedBlocks.add(blockId); + requestsRemaining.release(); } } - }, null - ); + } + }, null); - if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) { - fail("Timeout getting response from the server"); - } + if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) { + fail("Timeout getting response from the server"); } + client.close(); return res; } http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index dafefaa..16bad9f 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -96,13 +96,14 @@ public class ExternalShuffleSecuritySuite { ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true"))); } - try (ExternalShuffleClient client = - new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000)) { - client.init(appId); - // Registration either succeeds or throws an exception. - client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0", - new ExecutorShuffleInfo(new String[0], 0, "org.apache.spark.shuffle.sort.SortShuffleManager")); - } + ExternalShuffleClient client = + new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000); + client.init(appId); + // Registration either succeeds or throws an exception. + client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0", + new ExecutorShuffleInfo(new String[0], 0, + "org.apache.spark.shuffle.sort.SortShuffleManager")); + client.close(); } /** Provides a secret key holder which always returns the given secret key, for a single appId. */ http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java ---------------------------------------------------------------------- diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java index 06a248c..f7c22dd 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java @@ -191,9 +191,10 @@ public abstract class CountMinSketch { * Reads in a {@link CountMinSketch} from a byte array. */ public static CountMinSketch readFrom(byte[] bytes) throws IOException { - try (InputStream in = new ByteArrayInputStream(bytes)) { - return readFrom(in); - } + InputStream in = new ByteArrayInputStream(bytes); + CountMinSketch cms = readFrom(in); + in.close(); + return cms; } /** http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java ---------------------------------------------------------------------- diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java index b78c167..fd1906d 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -322,10 +322,10 @@ class CountMinSketchImpl extends CountMinSketch implements Serializable { @Override public byte[] toByteArray() throws IOException { - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - writeTo(out); - return out.toByteArray(); - } + ByteArrayOutputStream out = new ByteArrayOutputStream(); + writeTo(out); + out.close(); + return out.toByteArray(); } public static CountMinSketchImpl readFrom(InputStream in) throws IOException { http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 2e18715..0cced9e 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -135,58 +135,62 @@ public class ReadAheadInputStream extends InputStream { } finally { stateChangeLock.unlock(); } - executorService.execute(() -> { - stateChangeLock.lock(); - try { - if (isClosed) { - readInProgress = false; - return; - } - // Flip this so that the close method will not close the underlying input stream when we - // are reading. - isReading = true; - } finally { - stateChangeLock.unlock(); - } + executorService.execute(new Runnable() { - // Please note that it is safe to release the lock and read into the read ahead buffer - // because either of following two conditions will hold - 1. The active buffer has - // data available to read so the reader will not read from the read ahead buffer. - // 2. This is the first time read is called or the active buffer is exhausted, - // in that case the reader waits for this async read to complete. - // So there is no race condition in both the situations. - int read = 0; - int off = 0, len = arr.length; - Throwable exception = null; - try { - // try to fill the read ahead buffer. - // if a reader is waiting, possibly return early. - do { - read = underlyingInputStream.read(arr, off, len); - if (read <= 0) break; - off += read; - len -= read; - } while (len > 0 && !isWaiting.get()); - } catch (Throwable ex) { - exception = ex; - if (ex instanceof Error) { - // `readException` may not be reported to the user. Rethrow Error to make sure at least - // The user can see Error in UncaughtExceptionHandler. - throw (Error) ex; - } - } finally { + @Override + public void run() { stateChangeLock.lock(); - readAheadBuffer.limit(off); - if (read < 0 || (exception instanceof EOFException)) { - endOfStream = true; - } else if (exception != null) { - readAborted = true; - readException = exception; + try { + if (isClosed) { + readInProgress = false; + return; + } + // Flip this so that the close method will not close the underlying input stream when we + // are reading. + isReading = true; + } finally { + stateChangeLock.unlock(); + } + + // Please note that it is safe to release the lock and read into the read ahead buffer + // because either of following two conditions will hold - 1. The active buffer has + // data available to read so the reader will not read from the read ahead buffer. + // 2. This is the first time read is called or the active buffer is exhausted, + // in that case the reader waits for this async read to complete. + // So there is no race condition in both the situations. + int read = 0; + int off = 0, len = arr.length; + Throwable exception = null; + try { + // try to fill the read ahead buffer. + // if a reader is waiting, possibly return early. + do { + read = underlyingInputStream.read(arr, off, len); + if (read <= 0) break; + off += read; + len -= read; + } while (len > 0 && !isWaiting.get()); + } catch (Throwable ex) { + exception = ex; + if (ex instanceof Error) { + // `readException` may not be reported to the user. Rethrow Error to make sure at least + // The user can see Error in UncaughtExceptionHandler. + throw (Error) ex; + } + } finally { + stateChangeLock.lock(); + readAheadBuffer.limit(off); + if (read < 0 || (exception instanceof EOFException)) { + endOfStream = true; + } else if (exception != null) { + readAborted = true; + readException = exception; + } + readInProgress = false; + signalAsyncReadComplete(); + stateChangeLock.unlock(); + closeUnderlyingInputStreamIfNecessary(); } - readInProgress = false; - signalAsyncReadComplete(); - stateChangeLock.unlock(); - closeUnderlyingInputStreamIfNecessary(); } }); } http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index abe027f..323a5d3 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -152,9 +152,9 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> { } for (int i = 0; i < numPartitions; i++) { - try (final DiskBlockObjectWriter writer = partitionWriters[i]) { - partitionWriterSegments[i] = writer.commitAndGet(); - } + final DiskBlockObjectWriter writer = partitionWriters[i]; + partitionWriterSegments[i] = writer.commitAndGet(); + writer.close(); } File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index ad66074..c7d2db4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -181,43 +181,42 @@ final class ShuffleExternalSorter extends MemoryConsumer { // around this, we pass a dummy no-op serializer. final SerializerInstance ser = DummySerializerInstance.INSTANCE; - int currentPartition = -1; - final FileSegment committedSegment; - try (final DiskBlockObjectWriter writer = - blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) { - - final int uaoSize = UnsafeAlignedOffset.getUaoSize(); - while (sortedRecords.hasNext()) { - sortedRecords.loadNext(); - final int partition = sortedRecords.packedRecordPointer.getPartitionId(); - assert (partition >= currentPartition); - if (partition != currentPartition) { - // Switch to the new partition - if (currentPartition != -1) { - final FileSegment fileSegment = writer.commitAndGet(); - spillInfo.partitionLengths[currentPartition] = fileSegment.length(); - } - currentPartition = partition; - } + final DiskBlockObjectWriter writer = + blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse); - final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); - final Object recordPage = taskMemoryManager.getPage(recordPointer); - final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); - int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); - long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length - while (dataRemaining > 0) { - final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); - Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); - writer.write(writeBuffer, 0, toTransfer); - recordReadPosition += toTransfer; - dataRemaining -= toTransfer; + int currentPartition = -1; + final int uaoSize = UnsafeAlignedOffset.getUaoSize(); + while (sortedRecords.hasNext()) { + sortedRecords.loadNext(); + final int partition = sortedRecords.packedRecordPointer.getPartitionId(); + assert (partition >= currentPartition); + if (partition != currentPartition) { + // Switch to the new partition + if (currentPartition != -1) { + final FileSegment fileSegment = writer.commitAndGet(); + spillInfo.partitionLengths[currentPartition] = fileSegment.length(); } - writer.recordWritten(); + currentPartition = partition; } - committedSegment = writer.commitAndGet(); + final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); + final Object recordPage = taskMemoryManager.getPage(recordPointer); + final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); + int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); + long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length + while (dataRemaining > 0) { + final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); + Platform.copyMemory( + recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); + writer.write(writeBuffer, 0, toTransfer); + recordReadPosition += toTransfer; + dataRemaining -= toTransfer; + } + writer.recordWritten(); } + + final FileSegment committedSegment = writer.commitAndGet(); + writer.close(); // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted, // then the file might be empty. Note that it might be better to avoid calling // writeSortedFile() in that case. http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java index c35661e..a6589d2 100644 --- a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java +++ b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java @@ -39,26 +39,30 @@ public class JavaJdbcRDDSuite implements Serializable { sc = new JavaSparkContext("local", "JavaAPISuite"); Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + Connection connection = + DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true"); - try (Connection connection = DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true")) { - - try (Statement create = connection.createStatement()) { - create.execute( - "CREATE TABLE FOO(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + - "DATA INTEGER)"); - } + try { + Statement create = connection.createStatement(); + create.execute( + "CREATE TABLE FOO(" + + "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + + "DATA INTEGER)"); + create.close(); - try (PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")) { - for (int i = 1; i <= 100; i++) { - insert.setInt(1, i * 2); - insert.executeUpdate(); - } + PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)"); + for (int i = 1; i <= 100; i++) { + insert.setInt(1, i * 2); + insert.executeUpdate(); } + insert.close(); } catch (SQLException e) { // If table doesn't exist... if (e.getSQLState().compareTo("X0Y32") != 0) { throw e; } + } finally { + connection.close(); } } http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index a07d0e8..0d5c5ea 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -186,14 +186,14 @@ public class UnsafeShuffleWriterSuite { if (conf.getBoolean("spark.shuffle.compress", true)) { in = CompressionCodec$.MODULE$.createCodec(conf).compressedInputStream(in); } - try (DeserializationStream recordsStream = serializer.newInstance().deserializeStream(in)) { - Iterator<Tuple2<Object, Object>> records = recordsStream.asKeyValueIterator(); - while (records.hasNext()) { - Tuple2<Object, Object> record = records.next(); - assertEquals(i, hashPartitioner.getPartition(record._1())); - recordsList.add(record); - } + DeserializationStream recordsStream = serializer.newInstance().deserializeStream(in); + Iterator<Tuple2<Object, Object>> records = recordsStream.asKeyValueIterator(); + while (records.hasNext()) { + Tuple2<Object, Object> record = records.next(); + assertEquals(i, hashPartitioner.getPartition(record._1())); + recordsList.add(record); } + recordsStream.close(); startOffset += partitionSize; } } http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/core/src/test/java/test/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index 3992ab7..01b5fb7 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -997,10 +997,10 @@ public class JavaAPISuite implements Serializable { FileOutputStream fos1 = new FileOutputStream(file1); - try (FileChannel channel1 = fos1.getChannel()) { - ByteBuffer bbuf = ByteBuffer.wrap(content1); - channel1.write(bbuf); - } + FileChannel channel1 = fos1.getChannel(); + ByteBuffer bbuf = ByteBuffer.wrap(content1); + channel1.write(bbuf); + channel1.close(); JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName, 3); List<Tuple2<String, PortableDataStream>> result = readRDD.collect(); for (Tuple2<String, PortableDataStream> res : result) { @@ -1018,10 +1018,10 @@ public class JavaAPISuite implements Serializable { FileOutputStream fos1 = new FileOutputStream(file1); - try (FileChannel channel1 = fos1.getChannel()) { - ByteBuffer bbuf = ByteBuffer.wrap(content1); - channel1.write(bbuf); - } + FileChannel channel1 = fos1.getChannel(); + ByteBuffer bbuf = ByteBuffer.wrap(content1); + channel1.write(bbuf); + channel1.close(); JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache(); readRDD.foreach(pair -> pair._2().toArray()); // force the file to read @@ -1042,12 +1042,13 @@ public class JavaAPISuite implements Serializable { FileOutputStream fos1 = new FileOutputStream(file1); - try (FileChannel channel1 = fos1.getChannel()) { - for (int i = 0; i < numOfCopies; i++) { - ByteBuffer bbuf = ByteBuffer.wrap(content1); - channel1.write(bbuf); - } + FileChannel channel1 = fos1.getChannel(); + + for (int i = 0; i < numOfCopies; i++) { + ByteBuffer bbuf = ByteBuffer.wrap(content1); + channel1.write(bbuf); } + channel1.close(); JavaRDD<byte[]> readRDD = sc.binaryRecords(tempDirName, content1.length); assertEquals(numOfCopies,readRDD.count()); http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java index 4605138..551443a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java @@ -16,7 +16,6 @@ */ package org.apache.spark.sql.catalyst.expressions; -import java.io.Closeable; import java.io.IOException; import org.apache.spark.memory.MemoryConsumer; @@ -46,7 +45,7 @@ import org.slf4j.LoggerFactory; * page requires an average size for key value pairs to be larger than 1024 bytes. * */ -public abstract class RowBasedKeyValueBatch extends MemoryConsumer implements Closeable { +public abstract class RowBasedKeyValueBatch extends MemoryConsumer { protected final Logger logger = LoggerFactory.getLogger(RowBasedKeyValueBatch.class); private static final int DEFAULT_CAPACITY = 1 << 16; http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java index ef02f0a..2da8711 100644 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java @@ -123,8 +123,9 @@ public class RowBasedKeyValueBatchSuite { @Test public void emptyBatch() throws Exception { - try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { + RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY); + try { Assert.assertEquals(0, batch.numRows()); try { batch.getKeyRow(-1); @@ -151,24 +152,31 @@ public class RowBasedKeyValueBatchSuite { // Expected exception; do nothing. } Assert.assertFalse(batch.rowIterator().next()); + } finally { + batch.close(); } } @Test - public void batchType() { - try (RowBasedKeyValueBatch batch1 = RowBasedKeyValueBatch.allocate(keySchema, + public void batchType() throws Exception { + RowBasedKeyValueBatch batch1 = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY); + RowBasedKeyValueBatch batch2 = RowBasedKeyValueBatch.allocate(fixedKeySchema, valueSchema, taskMemoryManager, DEFAULT_CAPACITY); - RowBasedKeyValueBatch batch2 = RowBasedKeyValueBatch.allocate(fixedKeySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { + try { Assert.assertEquals(batch1.getClass(), VariableLengthRowBasedKeyValueBatch.class); Assert.assertEquals(batch2.getClass(), FixedLengthRowBasedKeyValueBatch.class); + } finally { + batch1.close(); + batch2.close(); } } @Test public void setAndRetrieve() { - try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { + RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY); + try { UnsafeRow ret1 = appendRow(batch, makeKeyRow(1, "A"), makeValueRow(1, 1)); Assert.assertTrue(checkValue(ret1, 1, 1)); UnsafeRow ret2 = appendRow(batch, makeKeyRow(2, "B"), makeValueRow(2, 2)); @@ -196,27 +204,33 @@ public class RowBasedKeyValueBatchSuite { } catch (AssertionError e) { // Expected exception; do nothing. } + } finally { + batch.close(); } } @Test public void setUpdateAndRetrieve() { - try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { + RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY); + try { appendRow(batch, makeKeyRow(1, "A"), makeValueRow(1, 1)); Assert.assertEquals(1, batch.numRows()); UnsafeRow retrievedValue = batch.getValueRow(0); updateValueRow(retrievedValue, 2, 2); UnsafeRow retrievedValue2 = batch.getValueRow(0); Assert.assertTrue(checkValue(retrievedValue2, 2, 2)); + } finally { + batch.close(); } } @Test public void iteratorTest() throws Exception { - try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { + RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY); + try { appendRow(batch, makeKeyRow(1, "A"), makeValueRow(1, 1)); appendRow(batch, makeKeyRow(2, "B"), makeValueRow(2, 2)); appendRow(batch, makeKeyRow(3, "C"), makeValueRow(3, 3)); @@ -239,13 +253,16 @@ public class RowBasedKeyValueBatchSuite { Assert.assertTrue(checkKey(key3, 3, "C")); Assert.assertTrue(checkValue(value3, 3, 3)); Assert.assertFalse(iterator.next()); + } finally { + batch.close(); } } @Test public void fixedLengthTest() throws Exception { - try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(fixedKeySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { + RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(fixedKeySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY); + try { appendRow(batch, makeKeyRow(11, 11), makeValueRow(1, 1)); appendRow(batch, makeKeyRow(22, 22), makeValueRow(2, 2)); appendRow(batch, makeKeyRow(33, 33), makeValueRow(3, 3)); @@ -276,13 +293,16 @@ public class RowBasedKeyValueBatchSuite { Assert.assertTrue(checkKey(key3, 33, 33)); Assert.assertTrue(checkValue(value3, 3, 3)); Assert.assertFalse(iterator.next()); + } finally { + batch.close(); } } @Test public void appendRowUntilExceedingCapacity() throws Exception { - try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, 10)) { + RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, 10); + try { UnsafeRow key = makeKeyRow(1, "A"); UnsafeRow value = makeValueRow(1, 1); for (int i = 0; i < 10; i++) { @@ -301,6 +321,8 @@ public class RowBasedKeyValueBatchSuite { Assert.assertTrue(checkValue(value1, 1, 1)); } Assert.assertFalse(iterator.next()); + } finally { + batch.close(); } } @@ -308,8 +330,9 @@ public class RowBasedKeyValueBatchSuite { public void appendRowUntilExceedingPageSize() throws Exception { // Use default size or spark.buffer.pageSize if specified int pageSizeToUse = (int) memoryManager.pageSizeBytes(); - try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, pageSizeToUse)) { + RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, pageSizeToUse); //enough capacity + try { UnsafeRow key = makeKeyRow(1, "A"); UnsafeRow value = makeValueRow(1, 1); int recordLength = 8 + key.getSizeInBytes() + value.getSizeInBytes() + 8; @@ -333,44 +356,49 @@ public class RowBasedKeyValueBatchSuite { Assert.assertTrue(checkValue(value1, 1, 1)); } Assert.assertFalse(iterator.next()); + } finally { + batch.close(); } } @Test public void failureToAllocateFirstPage() throws Exception { memoryManager.limit(1024); - try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { + RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY); + try { UnsafeRow key = makeKeyRow(1, "A"); UnsafeRow value = makeValueRow(11, 11); UnsafeRow ret = appendRow(batch, key, value); Assert.assertNull(ret); Assert.assertFalse(batch.rowIterator().next()); + } finally { + batch.close(); } } @Test public void randomizedTest() { - try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { - int numEntry = 100; - long[] expectedK1 = new long[numEntry]; - String[] expectedK2 = new String[numEntry]; - long[] expectedV1 = new long[numEntry]; - long[] expectedV2 = new long[numEntry]; - - for (int i = 0; i < numEntry; i++) { - long k1 = rand.nextLong(); - String k2 = getRandomString(rand.nextInt(256)); - long v1 = rand.nextLong(); - long v2 = rand.nextLong(); - appendRow(batch, makeKeyRow(k1, k2), makeValueRow(v1, v2)); - expectedK1[i] = k1; - expectedK2[i] = k2; - expectedV1[i] = v1; - expectedV2[i] = v2; - } - + RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY); + int numEntry = 100; + long[] expectedK1 = new long[numEntry]; + String[] expectedK2 = new String[numEntry]; + long[] expectedV1 = new long[numEntry]; + long[] expectedV2 = new long[numEntry]; + + for (int i = 0; i < numEntry; i++) { + long k1 = rand.nextLong(); + String k2 = getRandomString(rand.nextInt(256)); + long v1 = rand.nextLong(); + long v2 = rand.nextLong(); + appendRow(batch, makeKeyRow(k1, k2), makeValueRow(v1, v2)); + expectedK1[i] = k1; + expectedK2[i] = k2; + expectedV1[i] = v1; + expectedV2[i] = v2; + } + try { for (int j = 0; j < 10000; j++) { int rowId = rand.nextInt(numEntry); if (rand.nextBoolean()) { @@ -382,6 +410,8 @@ public class RowBasedKeyValueBatchSuite { Assert.assertTrue(checkValue(value, expectedV1[rowId], expectedV2[rowId])); } } + } finally { + batch.close(); } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java index 3cbc2c4..791ddcb 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java @@ -146,11 +146,18 @@ public class CLIService extends CompositeService implements ICLIService { public synchronized void start() { super.start(); // Initialize and test a connection to the metastore - try (IMetaStoreClient metastoreClient = new HiveMetaStoreClient(hiveConf)) { + IMetaStoreClient metastoreClient = null; + try { + metastoreClient = new HiveMetaStoreClient(hiveConf); metastoreClient.getDatabases("default"); } catch (Exception e) { throw new ServiceException("Unable to connect to MetaStore!", e); } + finally { + if (metastoreClient != null) { + metastoreClient.close(); + } + } } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/5ae20cf1/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java index 4a8779e..92c340a 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -197,11 +197,11 @@ public class OperationManager extends AbstractService { } public void closeOperation(OperationHandle opHandle) throws HiveSQLException { - try (Operation operation = removeOperation(opHandle)) { - if (operation == null) { - throw new HiveSQLException("Operation does not exist!"); - } + Operation operation = removeOperation(opHandle); + if (operation == null) { + throw new HiveSQLException("Operation does not exist!"); } + operation.close(); } public TableSchema getOperationResultSetSchema(OperationHandle opHandle) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
