http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java new file mode 100644 index 0000000..0946d98 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketException; +import java.security.MessageDigest; + +import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.util.InstantiationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE; +import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE; +import static org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE; +import static org.apache.flink.runtime.blob.BlobUtils.closeSilently; +import static org.apache.flink.runtime.blob.BlobUtils.readFully; +import static org.apache.flink.runtime.blob.BlobUtils.readLength; +import static org.apache.flink.runtime.blob.BlobUtils.writeLength; + +import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE; +import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION; +import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION; +import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION; +import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH; +import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY; +import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR; + +/** + * A BLOB connection handles a series of requests from a particular BLOB client. + */ +class BlobServerConnection extends Thread { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class); + + /** The socket to communicate with the client. */ + private final Socket clientSocket; + + /** The BLOB server. */ + private final BlobServer blobServer; + + /** + * Creates a new BLOB connection for a client request + * + * @param clientSocket The socket to read/write data. + * @param blobServer The BLOB server. + */ + BlobServerConnection(Socket clientSocket, BlobServer blobServer) { + super("BLOB connection for " + clientSocket.getRemoteSocketAddress().toString()); + setDaemon(true); + + if (blobServer == null) { + throw new NullPointerException(); + } + + this.clientSocket = clientSocket; + this.blobServer = blobServer; + } + + // -------------------------------------------------------------------------------------------- + // Connection / Thread methods + // -------------------------------------------------------------------------------------------- + + /** + * Main connection work method. Accepts requests until the other side closes the connection. + */ + @Override + public void run() { + try { + final InputStream inputStream = this.clientSocket.getInputStream(); + final OutputStream outputStream = this.clientSocket.getOutputStream(); + final byte[] buffer = new byte[BUFFER_SIZE]; + + while (true) { + // Read the requested operation + final int operation = inputStream.read(); + if (operation < 0) { + // done, no one is asking anything from us + return; + } + + switch (operation) { + case PUT_OPERATION: + put(inputStream, outputStream, buffer); + break; + case GET_OPERATION: + get(inputStream, outputStream, buffer); + break; + case DELETE_OPERATION: + delete(inputStream, outputStream, buffer); + break; + default: + throw new IOException("Unknown operation " + operation); + } + } + } + catch (SocketException e) { + // this happens when the remote site closes the connection + LOG.debug("Socket connection closed", e); + } + catch (Throwable t) { + LOG.error("Error while executing BLOB connection.", t); + } + finally { + try { + if (clientSocket != null) { + clientSocket.close(); + } + } catch (Throwable t) { + LOG.debug("Exception while closing BLOB server connection socket.", t); + } + + blobServer.unregisterConnection(this); + } + } + + /** + * Closes the connection socket and lets the thread exit. + */ + public void close() { + closeSilently(clientSocket, LOG); + interrupt(); + } + + // -------------------------------------------------------------------------------------------- + // Actions + // -------------------------------------------------------------------------------------------- + + /** + * Handles an incoming GET request from a BLOB client. + * + * @param inputStream + * the input stream to read incoming data from + * @param outputStream + * the output stream to send data back to the client + * @param buf + * an auxiliary buffer for data serialization/deserialization + * @throws IOException + * thrown if an I/O error occurs while reading/writing data from/to the respective streams + */ + private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { + + File blobFile; + try { + final int contentAdressable = inputStream.read(); + + if (contentAdressable < 0) { + throw new EOFException("Premature end of GET request"); + } + if (contentAdressable == NAME_ADDRESSABLE) { + // Receive the job ID and key + byte[] jidBytes = new byte[JobID.SIZE]; + readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); + + JobID jobID = JobID.fromByteArray(jidBytes); + String key = readKey(buf, inputStream); + blobFile = this.blobServer.getStorageLocation(jobID, key); + } + else if (contentAdressable == CONTENT_ADDRESSABLE) { + final BlobKey key = BlobKey.readFromInputStream(inputStream); + blobFile = blobServer.getStorageLocation(key); + } + else { + throw new IOException("Unknown type of BLOB addressing."); + } + + // Check if BLOB exists + if (!blobFile.exists()) { + throw new IOException("Cannot find required BLOB at " + blobFile.getAbsolutePath()); + } + if (blobFile.length() > Integer.MAX_VALUE) { + throw new IOException("BLOB size exceeds the maximum size (2 GB)."); + } + + outputStream.write(RETURN_OKAY); + + // up to here, an error can give a good message + } + catch (Throwable t) { + LOG.error("GET operation failed", t); + try { + writeErrorToStream(outputStream, t); + } + catch (IOException e) { + // since we are in an exception case, it means not much that we could not send the error + // ignore this + } + clientSocket.close(); + return; + } + + // from here on, we started sending data, so all we can do is close the connection when something happens + try { + int blobLen = (int) blobFile.length(); + writeLength(blobLen, outputStream); + + FileInputStream fis = new FileInputStream(blobFile); + try { + int bytesRemaining = blobLen; + while (bytesRemaining > 0) { + int read = fis.read(buf); + if (read < 0) { + throw new IOException("Premature end of BLOB file stream for " + blobFile.getAbsolutePath()); + } + outputStream.write(buf, 0, read); + bytesRemaining -= read; + } + } finally { + fis.close(); + } + } + catch (SocketException e) { + // happens when the other side disconnects + LOG.debug("Socket connection closed", e); + } + catch (Throwable t) { + LOG.error("GET operation failed", t); + clientSocket.close(); + } + } + + /** + * Handles an incoming PUT request from a BLOB client. + * + * @param inputStream The input stream to read incoming data from. + * @param outputStream The output stream to send data back to the client. + * @param buf An auxiliary buffer for data serialization/deserialization. + */ + private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { + JobID jobID = null; + String key = null; + MessageDigest md = null; + + File incomingFile = null; + FileOutputStream fos = null; + + try { + final int contentAdressable = inputStream.read(); + if (contentAdressable < 0) { + throw new EOFException("Premature end of PUT request"); + } + + if (contentAdressable == NAME_ADDRESSABLE) { + // Receive the job ID and key + byte[] jidBytes = new byte[JobID.SIZE]; + readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); + jobID = JobID.fromByteArray(jidBytes); + key = readKey(buf, inputStream); + } + else if (contentAdressable == CONTENT_ADDRESSABLE) { + md = BlobUtils.createMessageDigest(); + } + else { + throw new IOException("Unknown type of BLOB addressing."); + } + + if (LOG.isDebugEnabled()) { + if (contentAdressable == NAME_ADDRESSABLE) { + LOG.debug(String.format("Received PUT request for BLOB under %s / \"%s\"", jobID, key)); + } else { + LOG.debug("Received PUT request for content addressable BLOB"); + } + } + + incomingFile = blobServer.createTemporaryFilename(); + fos = new FileOutputStream(incomingFile); + + while (true) { + final int bytesExpected = readLength(inputStream); + if (bytesExpected == -1) { + // done + break; + } + if (bytesExpected > BUFFER_SIZE) { + throw new IOException("Unexpected number of incoming bytes: " + bytesExpected); + } + + readFully(inputStream, buf, 0, bytesExpected, "buffer"); + fos.write(buf, 0, bytesExpected); + + if (md != null) { + md.update(buf, 0, bytesExpected); + } + } + + fos.close(); + fos = null; + + if (contentAdressable == NAME_ADDRESSABLE) { + File storageFile = this.blobServer.getStorageLocation(jobID, key); + if (!incomingFile.renameTo(storageFile)) { + throw new IOException(String.format("Cannot move staging file %s to BLOB file %s", + incomingFile.getAbsolutePath(), storageFile.getAbsolutePath())); + } + incomingFile = null; + outputStream.write(RETURN_OKAY); + } + else { + BlobKey blobKey = new BlobKey(md.digest()); + File storageFile = blobServer.getStorageLocation(blobKey); + if (!incomingFile.renameTo(storageFile)) { + throw new IOException(String.format("Cannot move staging file %s to BLOB file %s", + incomingFile.getAbsolutePath(), storageFile.getAbsolutePath())); + } + incomingFile = null; + + // Return computed key to client for validation + outputStream.write(RETURN_OKAY); + blobKey.writeToOutputStream(outputStream); + } + } + catch (SocketException e) { + // happens when the other side disconnects + LOG.debug("Socket connection closed", e); + } + catch (Throwable t) { + LOG.error("PUT operation failed", t); + try { + writeErrorToStream(outputStream, t); + } + catch (IOException e) { + // since we are in an exception case, it means not much that we could not send the error + // ignore this + } + clientSocket.close(); + } + finally { + if (fos != null) { + try { + fos.close(); + } catch (Throwable t) { + LOG.warn("Cannot close stream to BLOB staging file", t); + } + } + if (incomingFile != null) { + if (!incomingFile.delete()) { + LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath()); + } + } + } + } + + /** + * Handles an incoming DELETE request from a BLOB client. + * + * @param inputStream The input stream to read the request from. + * @param outputStream The output stream to write the response to. + * @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream. + */ + private void delete(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { + + try { + int type = inputStream.read(); + if (type < 0) { + throw new EOFException("Premature end of DELETE request"); + } + + if (type == CONTENT_ADDRESSABLE) { + BlobKey key = BlobKey.readFromInputStream(inputStream); + File blobFile = this.blobServer.getStorageLocation(key); + if (blobFile.exists() && !blobFile.delete()) { + throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); + } + } + else if (type == NAME_ADDRESSABLE) { + byte[] jidBytes = new byte[JobID.SIZE]; + readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); + JobID jobID = JobID.fromByteArray(jidBytes); + + String key = readKey(buf, inputStream); + + File blobFile = this.blobServer.getStorageLocation(jobID, key); + if (blobFile.exists() && !blobFile.delete()) { + throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); + } + } + else if (type == JOB_ID_SCOPE) { + byte[] jidBytes = new byte[JobID.SIZE]; + readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); + JobID jobID = JobID.fromByteArray(jidBytes); + + blobServer.deleteJobDirectory(jobID); + } + else { + throw new IOException("Unrecognized addressing type: " + type); + } + + outputStream.write(RETURN_OKAY); + } + catch (Throwable t) { + LOG.error("DELETE operation failed", t); + try { + writeErrorToStream(outputStream, t); + } + catch (IOException e) { + // since we are in an exception case, it means not much that we could not send the error + // ignore this + } + clientSocket.close(); + } + } + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + /** + * Reads the key of a BLOB from the given input stream. + * + * @param buf + * auxiliary buffer to data deserialization + * @param inputStream + * the input stream to read the key from + * @return the key of a BLOB + * @throws IOException + * thrown if an I/O error occurs while reading the key data from the input stream + */ + private static String readKey(byte[] buf, InputStream inputStream) throws IOException { + final int keyLength = readLength(inputStream); + if (keyLength > MAX_KEY_LENGTH) { + throw new IOException("Unexpected key length " + keyLength); + } + + readFully(inputStream, buf, 0, keyLength, "BlobKey"); + return new String(buf, 0, keyLength, BlobUtils.DEFAULT_CHARSET); + } + + /** + * Writes to the output stream the error return code, and the given exception in serialized form. + * + * @param out Thr output stream to write to. + * @param t The exception to send. + * @throws IOException Thrown, if the output stream could not be written to. + */ + private static void writeErrorToStream(OutputStream out, Throwable t) throws IOException { + byte[] bytes = InstantiationUtil.serializeObject(t); + out.write(RETURN_ERROR); + writeLength(bytes.length, out); + out.write(bytes); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java new file mode 100644 index 0000000..6df7811 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.blob; + +public class BlobServerProtocol { + + // -------------------------------------------------------------------------------------------- + // Constants used in the protocol of the BLOB store + // -------------------------------------------------------------------------------------------- + + /** The buffer size in bytes for network transfers. */ + static final int BUFFER_SIZE = 65536; // 64 K + + /** The maximum key length allowed for storing BLOBs. */ + static final int MAX_KEY_LENGTH = 64; + + /** Internal code to identify a PUT operation. */ + static final byte PUT_OPERATION = 0; + + /** Internal code to identify a GET operation. */ + static final byte GET_OPERATION = 1; + + /** Internal code to identify a DELETE operation. */ + static final byte DELETE_OPERATION = 2; + + /** Internal code to identify a successful operation. */ + static final byte RETURN_OKAY = 0; + + /** Internal code to identify an erroneous operation. */ + static final byte RETURN_ERROR = 1; + + /** Internal code to identify a reference via content hash as the key */ + static final byte CONTENT_ADDRESSABLE = 0; + + /** Internal code to identify a reference via jobId and name as the key */ + static final byte NAME_ADDRESSABLE = 1; + + /** Internal code to identify a reference via jobId as the key */ + static final byte JOB_ID_SCOPE = 2; + + // -------------------------------------------------------------------------------------------- + + private BlobServerProtocol() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java index 148476f..a2400b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java @@ -21,27 +21,32 @@ package org.apache.flink.runtime.blob; import java.io.IOException; import java.net.URL; +/** + * A simple store and retrieve binary large objects (BLOBs). + */ public interface BlobService { + /** * This method returns the URL of the file associated with the provided blob key. * - * @param requiredBlob blob key associated with the requested file - * @return URL of the file + * @param key blob key associated with the requested file + * @return The URL to the file. * @throws IOException */ - URL getURL(final BlobKey requiredBlob) throws IOException; + URL getURL(BlobKey key) throws IOException; + /** * This method deletes the file associated with the provided blob key. * - * @param blobKey associated with the file to be deleted + * @param key associated with the file to be deleted * @throws IOException */ - void delete(final BlobKey blobKey) throws IOException; + void delete(BlobKey key) throws IOException; /** - * Returns the port of the blob service - * @return the port of the blob service + * Returns the port of the blob service. + * @return the port of the blob service. */ int getPort(); http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index 53cab1c..5db5ef6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -23,8 +23,12 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.runtime.jobgraph.JobID; import org.slf4j.Logger; +import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -90,10 +94,13 @@ public class BlobUtils { * @return the BLOB server's directory for incoming files */ static File getIncomingDirectory(File storageDir) { - final File incomingDirectory = new File(storageDir, "incoming"); - incomingDirectory.mkdir(); + final File incomingDir = new File(storageDir, "incoming"); - return incomingDirectory; + if (!incomingDir.exists() && !incomingDir.mkdir()) { + throw new RuntimeException("Cannot create directory for incoming files " + incomingDir.getAbsolutePath()); + } + + return incomingDir; } /** @@ -106,7 +113,7 @@ public class BlobUtils { final File cacheDirectory = new File(storageDir, "cache"); if (!cacheDirectory.exists() && !cacheDirectory.mkdir()) { - throw new RuntimeException("Could not create cache directory '" + cacheDirectory + "'."); + throw new RuntimeException("Could not create cache directory '" + cacheDirectory.getAbsolutePath() + "'."); } return cacheDirectory; @@ -119,7 +126,7 @@ public class BlobUtils { * the key identifying the BLOB * @return the (designated) physical storage location of the BLOB */ - static File getStorageLocation(final File storageDir, final BlobKey key) { + static File getStorageLocation(File storageDir, BlobKey key) { return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX + key.toString()); } @@ -132,7 +139,7 @@ public class BlobUtils { * the key of the BLOB * @return the (designated) physical storage location of the BLOB with the given job ID and key */ - static File getStorageLocation(final File storageDir, final JobID jobID, final String key) { + static File getStorageLocation(File storageDir, JobID jobID, String key) { return new File(getJobDirectory(storageDir, jobID), BLOB_FILE_PREFIX + encodeKey(key)); } @@ -143,9 +150,12 @@ public class BlobUtils { * the ID of the job to return the storage directory for * @return the storage directory for BLOBs belonging to the job with the given ID */ - private static File getJobDirectory(final File storageDir, final JobID jobID){ + private static File getJobDirectory(File storageDir, JobID jobID) { final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString()); - jobDirectory.mkdirs(); + + if (!jobDirectory.exists() && !jobDirectory.mkdirs()) { + throw new RuntimeException("Could not create jobId directory '" + jobDirectory.getAbsolutePath() + "'."); + } return jobDirectory; } @@ -157,8 +167,7 @@ public class BlobUtils { * the user's key for a BLOB * @return the internal name for the BLOB as used by the BLOB server */ - private static String encodeKey(final String key) { - + private static String encodeKey(String key) { return BaseEncoding.base64().encode(key.getBytes(DEFAULT_CHARSET)); } @@ -168,9 +177,8 @@ public class BlobUtils { * @param jobID * jobID whose directory shall be deleted */ - static void deleteJobDirectory(final File storageDir, final JobID jobID) throws IOException { + static void deleteJobDirectory(File storageDir, JobID jobID) throws IOException { File directory = getJobDirectory(storageDir, jobID); - FileUtils.deleteDirectory(directory); } @@ -220,4 +228,94 @@ public class BlobUtils { return null; } } + + /** + * Auxiliary method to write the length of an upcoming data chunk to an + * output stream. + * + * @param length + * the length of the upcoming data chunk in bytes + * @param outputStream + * the output stream to write the length to + * @throws IOException + * thrown if an I/O error occurs while writing to the output + * stream + */ + static void writeLength(int length, OutputStream outputStream) throws IOException { + byte[] buf = new byte[4]; + buf[0] = (byte) (length & 0xff); + buf[1] = (byte) ((length >> 8) & 0xff); + buf[2] = (byte) ((length >> 16) & 0xff); + buf[3] = (byte) ((length >> 24) & 0xff); + outputStream.write(buf, 0, 4); + } + + /** + * Auxiliary method to read the length of an upcoming data chunk from an + * input stream. + * + * @param inputStream + * the input stream to read the length from + * @return the length of the upcoming data chunk in bytes + * @throws IOException + * thrown if an I/O error occurs while reading from the input + * stream + */ + static int readLength(InputStream inputStream) throws IOException { + byte[] buf = new byte[4]; + int bytesRead = 0; + while (bytesRead < 4) { + final int read = inputStream.read(buf, bytesRead, 4 - bytesRead); + if (read < 0) { + throw new EOFException("Read an incomplete length"); + } + bytesRead += read; + } + + bytesRead = buf[0] & 0xff; + bytesRead |= (buf[1] & 0xff) << 8; + bytesRead |= (buf[2] & 0xff) << 16; + bytesRead |= (buf[3] & 0xff) << 24; + + return bytesRead; + } + + /** + * Auxiliary method to read a particular number of bytes from an input stream. This method blocks until the + * requested number of bytes have been read from the stream. If the stream cannot offer enough data, an + * {@link EOFException} is thrown. + * + * @param inputStream The input stream to read the data from. + * @param buf The buffer to store the read data. + * @param off The offset inside the buffer. + * @param len The number of bytes to read from the stream. + * @param type The name of the type, to throw a good error message in case of not enough data. + * @throws IOException + * Thrown if I/O error occurs while reading from the stream or the stream cannot offer enough data. + */ + static void readFully(InputStream inputStream, byte[] buf, int off, int len, String type) throws IOException { + + int bytesRead = 0; + while (bytesRead < len) { + + final int read = inputStream.read(buf, off + bytesRead, len + - bytesRead); + if (read < 0) { + throw new EOFException("Received an incomplete " + type); + } + bytesRead += read; + } + } + + static void closeSilently(Socket socket, Logger LOG) { + if (socket != null) { + try { + socket.close(); + } catch (Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("Error while closing resource after BLOB transfer.", t); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java index 7f7575b..ba9add5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java @@ -22,10 +22,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.apache.flink.runtime.jobgraph.JobID; import org.junit.Test; import org.apache.flink.runtime.testutils.CommonTestUtils; +import java.nio.ByteBuffer; + /** * This class contains tests for the {@link org.apache.flink.runtime.AbstractID} class. */ @@ -48,6 +51,45 @@ public class AbstractIDTest { fail(e.getMessage()); } } + + @Test + public void testConvertToBytes() { + try { + AbstractID origID = new AbstractID(); + + AbstractID copy1 = new AbstractID(origID); + AbstractID copy2 = new AbstractID(origID.getBytes()); + AbstractID copy3 = new AbstractID(origID.getLowerPart(), origID.getUpperPart()); + + assertEquals(origID, copy1); + assertEquals(origID, copy2); + assertEquals(origID, copy3); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testConvertToByteBuffer() { + try { + JobID origID = new JobID(); + + byte[] bytes = origID.getBytes(); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + + JobID copy1 = JobID.fromByteBuffer(buffer); + JobID copy2 = JobID.fromByteArray(bytes); + + assertEquals(origID, copy1); + assertEquals(origID, copy2); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } @Test public void testCompare() { http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java new file mode 100644 index 0000000..aba0aff --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.URL; + +import static org.junit.Assert.*; + +/** + * Unit tests for the blob cache retrying the connection to the server. + */ +public class BlobCacheRetriesTest { + + /** + * A test where the connection fails twice and then the get operation succeeds. + */ + @Test + public void testBlobFetchRetries() { + + final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0}; + + BlobServer server = null; + BlobCache cache = null; + try { + final Configuration config = new Configuration(); + + server = new TestingFailingBlobServer(config, 2); + + final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + + // upload some blob + BlobClient blobClient = null; + BlobKey key; + try { + blobClient = new BlobClient(serverAddress); + + key = blobClient.put(data); + } + finally { + if (blobClient != null) { + blobClient.close(); + } + } + + cache = new BlobCache(serverAddress, config); + + // trigger a download - it should fail on the first time, but retry, and succeed at the second time + URL url = cache.getURL(key); + InputStream is = url.openStream(); + try { + byte[] received = new byte[data.length]; + assertEquals(data.length, is.read(received)); + assertArrayEquals(data, received); + } + finally { + is.close(); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (cache != null) { + cache.shutdown(); + } + if (server != null) { + server.shutdown(); + } + } + } + + /** + * A test where the connection fails too often and eventually fails the GET request. + */ + @Test + public void testBlobFetchWithTooManyFailures() { + + final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 }; + + BlobServer server = null; + BlobCache cache = null; + try { + final Configuration config = new Configuration(); + + server = new TestingFailingBlobServer(config, 10); + + final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + + // upload some blob + BlobClient blobClient = null; + BlobKey key; + try { + blobClient = new BlobClient(serverAddress); + + key = blobClient.put(data); + } + finally { + if (blobClient != null) { + blobClient.close(); + } + } + + cache = new BlobCache(serverAddress, config); + + // trigger a download - it should fail eventually + try { + cache.getURL(key); + fail("This should fail"); + } + catch (IOException e) { + // as we expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (cache != null) { + cache.shutdown(); + } + if (server != null) { + server.shutdown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java new file mode 100644 index 0000000..4b92b71 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +/** + * This class contains unit tests for the {@link BlobCache}. + */ +public class BlobCacheSuccessTest { + + @Test + public void testBlobCache() { + + // First create two BLOBs and upload them to BLOB server + final byte[] buf = new byte[128]; + final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2); + + BlobServer blobServer = null; + BlobCache blobCache = null; + try { + + // Start the BLOB server + blobServer = new BlobServer(new Configuration()); + final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getPort()); + + // Upload BLOBs + BlobClient blobClient = null; + try { + + blobClient = new BlobClient(serverAddress); + + blobKeys.add(blobClient.put(buf)); + buf[0] = 1; // Make sure the BLOB key changes + blobKeys.add(blobClient.put(buf)); + } finally { + if (blobClient != null) { + blobClient.close(); + } + } + + blobCache = new BlobCache(serverAddress, new Configuration()); + + for(int i = 0; i < blobKeys.size(); i++){ + blobCache.getURL(blobKeys.get(i)); + } + + // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. + blobServer.shutdown(); + blobServer = null; + + final URL[] urls = new URL[blobKeys.size()]; + + for(int i = 0; i < blobKeys.size(); i++){ + urls[i] = blobCache.getURL(blobKeys.get(i)); + } + + // Verify the result + assertEquals(blobKeys.size(), urls.length); + + for (int i = 0; i < urls.length; ++i) { + + final URL url = urls[i]; + + assertNotNull(url); + + try { + final File cachedFile = new File(url.toURI()); + + assertTrue(cachedFile.exists()); + assertEquals(buf.length, cachedFile.length()); + + } catch (URISyntaxException e) { + fail(e.getMessage()); + } + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (blobServer != null) { + blobServer.shutdown(); + } + + if(blobCache != null){ + blobCache.shutdown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java deleted file mode 100644 index 32c8c3a..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.blob; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.configuration.Configuration; -import org.junit.Test; - -/** - * This class contains unit tests for the {@link BlobCache}. - */ -public class BlobCacheTest { - - @Test - public void testBlobCache() { - - // First create two BLOBs and upload them to BLOB server - final byte[] buf = new byte[128]; - final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2); - - BlobServer blobServer = null; - BlobCache blobCache = null; - try { - - // Start the BLOB server - blobServer = new BlobServer(new Configuration()); - final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getServerPort()); - - // Upload BLOBs - BlobClient blobClient = null; - try { - - blobClient = new BlobClient(serverAddress); - - blobKeys.add(blobClient.put(buf)); - buf[0] = 1; // Make sure the BLOB key changes - blobKeys.add(blobClient.put(buf)); - } finally { - if (blobClient != null) { - blobClient.close(); - } - } - - blobCache = new BlobCache(serverAddress, new Configuration()); - - for(int i = 0; i < blobKeys.size(); i++){ - blobCache.getURL(blobKeys.get(i)); - } - - // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. - blobServer.shutdown(); - blobServer = null; - - final URL[] urls = new URL[blobKeys.size()]; - - for(int i = 0; i < blobKeys.size(); i++){ - urls[i] = blobCache.getURL(blobKeys.get(i)); - } - - // Verify the result - assertEquals(blobKeys.size(), urls.length); - - for (int i = 0; i < urls.length; ++i) { - - final URL url = urls[i]; - - assertNotNull(url); - - try { - final File cachedFile = new File(url.toURI()); - - assertTrue(cachedFile.exists()); - assertEquals(buf.length, cachedFile.length()); - - } catch (URISyntaxException e) { - fail(e.getMessage()); - } - - } - - } catch (IOException ioe) { - fail(ioe.getMessage()); - } finally { - if (blobServer != null) { - blobServer.shutdown(); - } - - if(blobCache != null){ - blobCache.shutdown(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 1465777..2254d7c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.fail; import java.io.EOFException; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -33,7 +32,6 @@ import java.security.MessageDigest; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.util.StringUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -43,14 +41,10 @@ import org.junit.Test; */ public class BlobClientTest { - /** - * The buffer size used during the tests in bytes. - */ + /** The buffer size used during the tests in bytes. */ private static final int TEST_BUFFER_SIZE = 17 * 1000; - /** - * The instance of the BLOB server used during the tests. - */ + /** The instance of the BLOB server used during the tests. */ private static BlobServer BLOB_SERVER; /** @@ -60,10 +54,11 @@ public class BlobClientTest { public static void startServer() { try { BLOB_SERVER = new BlobServer(new Configuration()); - } catch (IOException ioe) { - fail(StringUtils.stringifyException(ioe)); } - + catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } } /** @@ -82,13 +77,10 @@ public class BlobClientTest { * @return a test buffer filled with a specific byte pattern */ private static byte[] createTestBuffer() { - final byte[] buf = new byte[TEST_BUFFER_SIZE]; - for (int i = 0; i < buf.length; ++i) { buf[i] = (byte) (i % 128); } - return buf; } @@ -102,7 +94,7 @@ public class BlobClientTest { * @throws IOException * thrown if an I/O error occurs while writing to the test file */ - private static BlobKey prepareTestFile(final File file) throws IOException { + private static BlobKey prepareTestFile(File file) throws IOException { MessageDigest md = BlobUtils.createMessageDigest(); @@ -203,44 +195,44 @@ public class BlobClientTest { @Test public void testContentAddressableBuffer() { - final byte[] testBuffer = createTestBuffer(); - final MessageDigest md = BlobUtils.createMessageDigest(); - md.update(testBuffer); - final BlobKey origKey = new BlobKey(md.digest()); + BlobClient client = null; try { + byte[] testBuffer = createTestBuffer(); + MessageDigest md = BlobUtils.createMessageDigest(); + md.update(testBuffer); + BlobKey origKey = new BlobKey(md.digest()); - BlobClient client = null; - try { + InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); + client = new BlobClient(serverAddress); - final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort()); - client = new BlobClient(serverAddress); + // Store the data + BlobKey receivedKey = client.put(testBuffer); + assertEquals(origKey, receivedKey); - // Store the data - final BlobKey receivedKey = client.put(testBuffer); - assertEquals(origKey, receivedKey); + // Retrieve the data + InputStream is = client.get(receivedKey); + validateGet(is, testBuffer); - // Retrieve the data - final InputStream is = client.get(receivedKey); - validateGet(is, testBuffer); - - // Check reaction to invalid keys + // Check reaction to invalid keys + try { + client.get(new BlobKey()); + fail("Expected IOException did not occur"); + } + catch (IOException fnfe) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { try { - client.get(new BlobKey()); - } catch (FileNotFoundException fnfe) { - return; - } - - fail("Expected FileNotFoundException did not occur"); - - } finally { - if (client != null) { client.close(); - } + } catch (Throwable t) {} } - - } catch (IOException ioe) { - fail(StringUtils.stringifyException(ioe)); } } @@ -250,42 +242,45 @@ public class BlobClientTest { @Test public void testContentAddressableStream() { - try { + BlobClient client = null; + InputStream is = null; - final File testFile = File.createTempFile("testfile", ".dat"); + try { + File testFile = File.createTempFile("testfile", ".dat"); testFile.deleteOnExit(); - final BlobKey origKey = prepareTestFile(testFile); - BlobClient client = null; - InputStream is = null; - try { - - final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort()); - client = new BlobClient(serverAddress); + BlobKey origKey = prepareTestFile(testFile); - // Store the data - is = new FileInputStream(testFile); - final BlobKey receivedKey = client.put(is); - assertEquals(origKey, receivedKey); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); + client = new BlobClient(serverAddress); - is.close(); - is = null; + // Store the data + is = new FileInputStream(testFile); + BlobKey receivedKey = client.put(is); + assertEquals(origKey, receivedKey); - // Retrieve the data - is = client.get(receivedKey); - validateGet(is, testFile); + is.close(); + is = null; - } finally { - if (is != null) { + // Retrieve the data + is = client.get(receivedKey); + validateGet(is, testFile); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (is != null) { + try { is.close(); - } - if (client != null) { + } catch (Throwable t) {} + } + if (client != null) { + try { client.close(); - } + } catch (Throwable t) {} } - - } catch (IOException ioe) { - fail(StringUtils.stringifyException(ioe)); } } @@ -300,11 +295,9 @@ public class BlobClientTest { final String key = "testkey"; try { - BlobClient client = null; try { - - final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort()); + final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); client = new BlobClient(serverAddress); // Store the data @@ -320,20 +313,21 @@ public class BlobClientTest { // Check if the BLOB is still available try { client.get(jobID, key); - } catch (FileNotFoundException fnfe) { - return; + fail("Expected IOException did not occur"); } - - fail("Expected FileNotFoundException did not occur"); - - } finally { + catch (IOException e) { + // expected + } + } + finally { if (client != null) { client.close(); } } - - } catch (IOException ioe) { - fail(StringUtils.stringifyException(ioe)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); } } @@ -355,7 +349,7 @@ public class BlobClientTest { InputStream is = null; try { - final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort()); + final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); client = new BlobClient(serverAddress); // Store the data @@ -369,7 +363,8 @@ public class BlobClientTest { is = client.get(jobID, key); validateGet(is, testFile); - } finally { + } + finally { if (is != null) { is.close(); } @@ -378,8 +373,10 @@ public class BlobClientTest { } } - } catch (IOException ioe) { - fail(StringUtils.stringifyException(ioe)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java new file mode 100644 index 0000000..adb3bfc --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobID; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Random; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests how DELETE requests behave. + */ +public class BlobServerDeleteTest { + + private final Random rnd = new Random(); + + @Test + public void testDeleteSingle() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + // put content addressable (like libraries) + BlobKey key = client.put(data); + assertNotNull(key); + + // issue a DELETE request + client.delete(key); + client.close(); + + client = new BlobClient(serverAddress); + try { + client.get(key); + fail("BLOB should have been deleted"); + } + catch (IOException e) { + // expected + } + + try { + client.put(new byte[1]); + fail("client should be closed after erroneous operation"); + } + catch (IllegalStateException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } + + @Test + public void testDeleteAll() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + JobID jobID = new JobID(); + String name1 = "random name"; + String name2 = "any nyme"; + + // put content addressable (like libraries) + client.put(jobID, name1, data); + client.put(jobID, name2, new byte[712]); + + + // issue a DELETE ALL request + client.deleteAll(jobID); + client.close(); + + client = new BlobClient(serverAddress); + try { + client.get(jobID, name1); + fail("BLOB should have been deleted"); + } + catch (IOException e) { + // expected + } + + try { + client.put(new byte[1]); + fail("client should be closed after erroneous operation"); + } + catch (IllegalStateException e) { + // expected + } + + client = new BlobClient(serverAddress); + try { + client.get(jobID, name2); + fail("BLOB should have been deleted"); + } + catch (IOException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } + + @Test + public void testDeleteAlreadyDeletedByBlobKey() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + // put content addressable (like libraries) + BlobKey key = client.put(data); + assertNotNull(key); + + File blobFile = server.getStorageLocation(key); + assertTrue(blobFile.delete()); + + // issue a DELETE request + try { + client.delete(key); + } + catch (IOException e) { + fail("DELETE operation should not fail if file is already deleted"); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } + + @Test + public void testDeleteAlreadyDeletedByName() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + JobID jid = new JobID(); + String name = "------------fdghljEgRJHF+##4U789Q345"; + + client.put(jid, name, data); + + File blobFile = server.getStorageLocation(jid, name); + assertTrue(blobFile.delete()); + + // issue a DELETE request + try { + client.delete(jid, name); + } + catch (IOException e) { + fail("DELETE operation should not fail if file is already deleted"); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } + + @Test + public void testDeleteFails() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + // put content addressable (like libraries) + BlobKey key = client.put(data); + assertNotNull(key); + + File blobFile = server.getStorageLocation(key); + File directory = blobFile.getParentFile(); + + assertTrue(blobFile.setWritable(false, false)); + assertTrue(directory.setWritable(false, false)); + + // issue a DELETE request + try { + client.delete(key); + fail("DELETE operation should fail if file cannot be deleted"); + } + catch (IOException e) { + // expected + } + finally { + blobFile.setWritable(true, false); + directory.setWritable(true, false); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java new file mode 100644 index 0000000..c564670 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.Random; + +import static org.junit.Assert.*; + +/** + * Tests how failing GET requests behave in the presence of failures. + * Successful GET requests are tested in conjunction wit the PUT + * requests. + */ +public class BlobServerGetTest { + + private final Random rnd = new Random(); + + @Test + public void testGetFailsDuringLookup() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + // put content addressable (like libraries) + BlobKey key = client.put(data); + assertNotNull(key); + + // delete all files to make sure that GET requests fail + File blobFile = server.getStorageLocation(key); + assertTrue(blobFile.delete()); + + // issue a GET request that fails + try { + client.get(key); + fail("This should not succeed."); + } + catch (IOException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } + + @Test + public void testGetFailsDuringStreaming() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[5000000]; + rnd.nextBytes(data); + + // put content addressable (like libraries) + BlobKey key = client.put(data); + assertNotNull(key); + + // issue a GET request that succeeds + InputStream is = client.get(key); + + byte[] receiveBuffer = new byte[50000]; + BlobUtils.readFully(is, receiveBuffer, 0, receiveBuffer.length, null); + BlobUtils.readFully(is, receiveBuffer, 0, receiveBuffer.length, null); + + // shut down the server + for (BlobServerConnection conn : server.getCurrentyActiveConnections()) { + conn.close(); + } + + try { + byte[] remainder = new byte[data.length - 2*receiveBuffer.length]; + BlobUtils.readFully(is, remainder, 0, remainder.length, null); + fail(); + } + catch (IOException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java new file mode 100644 index 0000000..1f8d29b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobID; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.Random; + +import static org.junit.Assert.*; + +/** + * Tests for successful and failing PUT operations against the BLOB server, + * and successful GET operations. + */ +public class BlobServerPutTest { + + private final Random rnd = new Random(); + + @Test + public void testPutBufferSuccessful() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + // put content addressable (like libraries) + BlobKey key1 = client.put(data); + assertNotNull(key1); + + BlobKey key2 = client.put(data, 10, 44); + assertNotNull(key2); + + // put under job and name scope + JobID jid = new JobID(); + String stringKey = "my test key"; + client.put(jid, stringKey, data); + + // --- GET the data and check that it is equal --- + + // one get request on the same client + InputStream is1 = client.get(key2); + byte[] result1 = new byte[44]; + BlobUtils.readFully(is1, result1, 0, result1.length, null); + is1.close(); + + for (int i = 0, j = 10; i < result1.length; i++, j++) { + assertEquals(data[j], result1[i]); + } + + // close the client and create a new one for the remaining requests + client.close(); + client = new BlobClient(serverAddress); + + InputStream is2 = client.get(key1); + byte[] result2 = new byte[data.length]; + BlobUtils.readFully(is2, result2, 0, result2.length, null); + is2.close(); + assertArrayEquals(data, result2); + + InputStream is3 = client.get(jid, stringKey); + byte[] result3 = new byte[data.length]; + BlobUtils.readFully(is3, result3, 0, result3.length, null); + is3.close(); + assertArrayEquals(data, result3); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } + + + @Test + public void testPutStreamSuccessful() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + // put content addressable (like libraries) + { + BlobKey key1 = client.put(new ByteArrayInputStream(data)); + assertNotNull(key1); + + } + + // put under job and name scope + { + JobID jid = new JobID(); + String stringKey = "my test key"; + client.put(jid, stringKey, new ByteArrayInputStream(data)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } + + @Test + public void testPutChunkedStreamSuccessful() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + // put content addressable (like libraries) + { + BlobKey key1 = client.put(new ChunkedInputStream(data, 19)); + assertNotNull(key1); + + } + + // put under job and name scope + { + JobID jid = new JobID(); + String stringKey = "my test key"; + client.put(jid, stringKey, new ChunkedInputStream(data, 17)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } + + @Test + public void testPutBufferFails() { + BlobServer server = null; + BlobClient client = null; + + File tempFileDir = null; + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + // make sure the blob server cannot create any files in its storage dir + tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile(); + assertTrue(tempFileDir.setExecutable(true, false)); + assertTrue(tempFileDir.setReadable(true, false)); + assertTrue(tempFileDir.setWritable(false, false)); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + // put content addressable (like libraries) + try { + client.put(data); + fail("This should fail."); + } + catch (IOException e) { + assertTrue(e.getMessage(), e.getMessage().contains("Server side error")); + } + + try { + client.put(data); + fail("Client should be closed"); + } + catch (IllegalStateException e) { + // expected + } + + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + // set writable again to make sure we can remove the directory + if (tempFileDir != null) { + tempFileDir.setWritable(true, false); + } + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } + + @Test + public void testPutNamedBufferFails() { + BlobServer server = null; + BlobClient client = null; + + File tempFileDir = null; + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + // make sure the blob server cannot create any files in its storage dir + tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile(); + assertTrue(tempFileDir.setExecutable(true, false)); + assertTrue(tempFileDir.setReadable(true, false)); + assertTrue(tempFileDir.setWritable(false, false)); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + // put under job and name scope + try { + JobID jid = new JobID(); + String stringKey = "my test key"; + client.put(jid, stringKey, data); + fail("This should fail."); + } + catch (IOException e) { + assertTrue(e.getMessage(), e.getMessage().contains("Server side error")); + } + + try { + JobID jid = new JobID(); + String stringKey = "another key"; + client.put(jid, stringKey, data); + fail("Client should be closed"); + } + catch (IllegalStateException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + // set writable again to make sure we can remove the directory + if (tempFileDir != null) { + tempFileDir.setWritable(true, false); + } + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } + + // -------------------------------------------------------------------------------------------- + + private static final class ChunkedInputStream extends InputStream { + + private final byte[][] data; + + private int x = 0, y = 0; + + + private ChunkedInputStream(byte[] data, int numChunks) { + this.data = new byte[numChunks][]; + + int bytesPerChunk = data.length / numChunks; + int bytesTaken = 0; + for (int i = 0; i < numChunks - 1; i++, bytesTaken += bytesPerChunk) { + this.data[i] = new byte[bytesPerChunk]; + System.arraycopy(data, bytesTaken, this.data[i], 0, bytesPerChunk); + } + + this.data[numChunks - 1] = new byte[data.length - bytesTaken]; + System.arraycopy(data, bytesTaken, this.data[numChunks - 1], 0, this.data[numChunks - 1].length); + } + + @Override + public int read() { + if (x < data.length) { + byte[] curr = data[x]; + if (y < curr.length) { + byte next = curr[y]; + y++; + return next; + } + else { + y = 0; + x++; + return read(); + } + } else { + return -1; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (len == 0) { + return 0; + } + if (x < data.length) { + byte[] curr = data[x]; + if (y < curr.length) { + int toCopy = Math.min(len, curr.length - y); + System.arraycopy(curr, y, b, off, toCopy); + y += toCopy; + return toCopy; + } else { + y = 0; + x++; + return read(b, off, len); + } + } + else { + return -1; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java new file mode 100644 index 0000000..93f9b73 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.Configuration; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +public class TestingFailingBlobServer extends BlobServer { + + private int numFailures; + + public TestingFailingBlobServer(Configuration config, int numFailures) throws IOException { + super(config); + this.numFailures = numFailures; + } + + @Override + public void run() { + + // we do properly the first operation (PUT) + try { + new BlobServerConnection(getServerSocket().accept(), this).start(); + } + catch (Throwable t) { + t.printStackTrace(); + } + + // do some failing operations + for (int num = 0; num < numFailures && !isShutdown(); num++) { + Socket socket = null; + try { + socket = getServerSocket().accept(); + InputStream is = socket.getInputStream(); + OutputStream os = socket.getOutputStream(); + + // just abort everything + is.close(); + os.close(); + socket.close(); + } + catch (IOException e) { + } + finally { + if (socket != null) { + try { + socket.close(); + } catch(Throwable t) {} + } + } + } + + // regular runs + super.run(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 5029b15..1d2b6ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -52,7 +52,7 @@ public class BlobLibraryCacheManagerTest { try { server = new BlobServer(new Configuration()); - InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getServerPort()); + InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); BlobClient bc = new BlobClient(blobSocketAddress); keys.add(bc.put(buf));
