http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java new file mode 100644 index 0000000..de5c934 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; + +/** + * The BlobFsOutputStream for Rest AbfsClient + */ +public class AbfsOutputStream extends OutputStream implements Syncable { + private final AbfsClient client; + private final String path; + private long position; + private boolean closed; + private volatile IOException lastError; + + private long lastFlushOffset; + private long lastTotalAppendOffset = 0; + + private final int bufferSize; + private byte[] buffer; + private int bufferIndex; + private final int maxConcurrentRequestCount; + + private ConcurrentLinkedDeque<WriteOperation> writeOperations; + private final ThreadPoolExecutor threadExecutor; + private final ExecutorCompletionService<Void> completionService; + + public AbfsOutputStream( + final AbfsClient client, + final String path, + final long position, + final int bufferSize) { + this.client = client; + this.path = path; + this.position = position; + this.closed = false; + this.lastError = null; + this.lastFlushOffset = 0; + this.bufferSize = bufferSize; + this.buffer = new byte[bufferSize]; + this.bufferIndex = 0; + this.writeOperations = new ConcurrentLinkedDeque<>(); + + this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + + this.threadExecutor + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 10L, + TimeUnit.SECONDS, + new LinkedBlockingQueue()); + this.completionService = new ExecutorCompletionService(this.threadExecutor); + } + + /** + * Writes the specified byte to this output stream. The general contract for + * write is that one byte is written to the output stream. The byte to be + * written is the eight low-order bits of the argument b. The 24 high-order + * bits of b are ignored. + * + * @param byteVal the byteValue to write. + * @throws IOException if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream has been closed. + */ + @Override + public void write(final int byteVal) throws IOException { + write(new byte[]{(byte) (byteVal & 0xFF)}); + } + + /** + * Writes length bytes from the specified byte array starting at off to + * this output stream. + * + * @param data the byte array to write. + * @param off the start off in the data. + * @param length the number of bytes to write. + * @throws IOException if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream has been closed. + */ + @Override + public synchronized void write(final byte[] data, final int off, final int length) + throws IOException { + if (this.lastError != null) { + throw this.lastError; + } + + Preconditions.checkArgument(data != null, "null data"); + + if (off < 0 || length < 0 || length > data.length - off) { + throw new IndexOutOfBoundsException(); + } + + int currentOffset = off; + int writableBytes = bufferSize - bufferIndex; + int numberOfBytesToWrite = length; + + while (numberOfBytesToWrite > 0) { + if (writableBytes <= numberOfBytesToWrite) { + System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); + bufferIndex += writableBytes; + writeCurrentBufferToService(); + currentOffset += writableBytes; + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + } else { + System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); + bufferIndex += numberOfBytesToWrite; + numberOfBytesToWrite = 0; + } + + writableBytes = bufferSize - bufferIndex; + } + } + + /** + * Flushes this output stream and forces any buffered output bytes to be + * written out. If any data remains in the payload it is committed to the + * service. Data is queued for writing and forced out to the service + * before the call returns. + */ + @Override + public void flush() throws IOException { + this.flushInternalAsync(); + } + + /** Similar to posix fsync, flush out the data in client's user buffer + * all the way to the disk device (but the disk may have it in its cache). + * @throws IOException if error occurs + */ + @Override + public void hsync() throws IOException { + this.flushInternal(); + } + + /** Flush out the data in client's user buffer. After the return of + * this call, new readers will see the data. + * @throws IOException if any error occurs + */ + @Override + public void hflush() throws IOException { + this.flushInternal(); + } + + /** + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. Close the access to the stream and + * shutdown the upload thread pool. + * If the blob was created, its lease will be released. + * Any error encountered caught in threads and stored will be rethrown here + * after cleanup. + */ + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + try { + this.flushInternal(); + this.threadExecutor.shutdown(); + } finally { + this.lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + this.buffer = null; + this.bufferIndex = 0; + this.closed = true; + this.writeOperations.clear(); + if (!this.threadExecutor.isShutdown()) { + this.threadExecutor.shutdownNow(); + } + } + } + + private synchronized void flushInternal() throws IOException { + if (this.lastError != null) { + throw this.lastError; + } + this.writeCurrentBufferToService(); + this.flushWrittenBytesToService(); + } + + private synchronized void flushInternalAsync() throws IOException { + if (this.lastError != null) { + throw this.lastError; + } + this.writeCurrentBufferToService(); + this.flushWrittenBytesToServiceAsync(); + } + + private synchronized void writeCurrentBufferToService() throws IOException { + if (bufferIndex == 0) { + return; + } + + final byte[] bytes = this.buffer; + final int bytesLength = bufferIndex; + + this.buffer = new byte[bufferSize]; + this.bufferIndex = 0; + final long offset = this.position; + this.position += bytesLength; + + if (this.threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + this.waitForTaskToComplete(); + } + + final Future job = this.completionService.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + client.append(path, offset, bytes, 0, + bytesLength); + return null; + } + }); + + this.writeOperations.add(new WriteOperation(job, offset, bytesLength)); + + // Try to shrink the queue + shrinkWriteOperationQueue(); + } + + private synchronized void flushWrittenBytesToService() throws IOException { + for (WriteOperation writeOperation : this.writeOperations) { + try { + writeOperation.task.get(); + } catch (Exception ex) { + if (AzureBlobFileSystemException.class.isInstance(ex.getCause())) { + ex = AzureBlobFileSystemException.class.cast(ex.getCause()); + } + this.lastError = new IOException(ex); + throw this.lastError; + } + } + flushWrittenBytesToServiceInternal(this.position, false); + } + + private synchronized void flushWrittenBytesToServiceAsync() throws IOException { + shrinkWriteOperationQueue(); + + if (this.lastTotalAppendOffset > this.lastFlushOffset) { + this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true); + } + + this.lastTotalAppendOffset = 0; + } + + private synchronized void flushWrittenBytesToServiceInternal(final long offset, final boolean retainUncommitedData) throws IOException { + try { + client.flush(path, offset, retainUncommitedData); + } catch (AzureBlobFileSystemException ex) { + throw new IOException(ex); + } + this.lastFlushOffset = offset; + } + + /** + * Try to remove the completed write operations from the beginning of write + * operation FIFO queue. + */ + private synchronized void shrinkWriteOperationQueue() throws IOException { + try { + while (this.writeOperations.peek() != null && this.writeOperations.peek().task.isDone()) { + this.writeOperations.peek().task.get(); + this.lastTotalAppendOffset += this.writeOperations.peek().length; + this.writeOperations.remove(); + } + } catch (Exception e) { + if (AzureBlobFileSystemException.class.isInstance(e.getCause())) { + this.lastError = IOException.class.cast(e.getCause()); + } else { + this.lastError = new IOException(e); + } + throw this.lastError; + } + } + + private void waitForTaskToComplete() throws IOException { + boolean completed; + for (completed = false; this.completionService.poll() != null; completed = true) {} + + if (!completed) { + try { + this.completionService.take(); + } catch (InterruptedException e) { + this.lastError = new IOException(e); + throw this.lastError; + } + } + } + + private static class WriteOperation { + private final Future<Void> task; + private final long startOffset; + private final long length; + + WriteOperation(final Future<Void> task, final long startOffset, final long length) { + Preconditions.checkNotNull(task, "task"); + Preconditions.checkArgument(startOffset >= 0, "startOffset"); + Preconditions.checkArgument(length >= 0, "length"); + + this.task = task; + this.startOffset = startOffset; + this.length = length; + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java new file mode 100644 index 0000000..17fc35a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.List; + +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; + +import org.slf4j.Logger; + +/** + * The AbfsRestOperation for Rest AbfsClient + */ +public class AbfsRestOperation { + // Blob FS client, which has the credentials, retry policy, and logs. + private final AbfsClient client; + // the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE) + private final String method; + // full URL including query parameters + private final URL url; + // all the custom HTTP request headers provided by the caller + private final List<AbfsHttpHeader> requestHeaders; + + // This is a simple operation class, where all the upload methods have a + // request body and all the download methods have a response body. + private final boolean hasRequestBody; + + private final Logger logger; + + // For uploads, this is the request entity body. For downloads, + // this will hold the response entity body. + private byte[] buffer; + private int bufferOffset; + private int bufferLength; + + private AbfsHttpOperation result; + + public AbfsHttpOperation getResult() { + return result; + } + + /** + * Initializes a new REST operation. + * + * @param client The Blob FS client. + * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). + * @param url The full URL including query string parameters. + * @param requestHeaders The HTTP request headers. + */ + AbfsRestOperation(final AbfsClient client, + final String method, + final URL url, + final List<AbfsHttpHeader> requestHeaders) { + this.client = client; + this.method = method; + this.url = url; + this.requestHeaders = requestHeaders; + this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method) + || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method)); + this.logger = client.LOG; + } + + /** + * Initializes a new REST operation. + * + * @param client The Blob FS client. + * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). + * @param url The full URL including query string parameters. + * @param requestHeaders The HTTP request headers. + * @param buffer For uploads, this is the request entity body. For downloads, + * this will hold the response entity body. + * @param bufferOffset An offset into the buffer where the data beings. + * @param bufferLength The length of the data in the buffer. + */ + AbfsRestOperation(AbfsClient client, + String method, + URL url, + List<AbfsHttpHeader> requestHeaders, + byte[] buffer, + int bufferOffset, + int bufferLength) { + this(client, method, url, requestHeaders); + this.buffer = buffer; + this.bufferOffset = bufferOffset; + this.bufferLength = bufferLength; + } + + /** + * Executes the REST operation with retry, by issuing one or more + * HTTP operations. + */ + void execute() throws AzureBlobFileSystemException { + int retryCount = 0; + while (!executeHttpOperation(retryCount++)) { + try { + Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount)); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + if (result.getStatusCode() > HttpURLConnection.HTTP_BAD_REQUEST) { + throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(), + result.getStorageErrorMessage(), null, result); + } + } + + /** + * Executes a single HTTP operation to complete the REST operation. If it + * fails, there may be a retry. The retryCount is incremented with each + * attempt. + */ + private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileSystemException { + AbfsHttpOperation httpOperation = null; + try { + // initialize the HTTP request and open the connection + httpOperation = new AbfsHttpOperation(url, method, requestHeaders); + + // sign the HTTP request + client.getSharedKeyCredentials().signRequest( + httpOperation.getConnection(), + hasRequestBody ? bufferLength : 0); + + if (hasRequestBody) { + // HttpUrlConnection requires + httpOperation.sendRequest(buffer, bufferOffset, bufferLength); + } + + httpOperation.processResponse(buffer, bufferOffset, bufferLength); + } catch (IOException ex) { + if (logger.isDebugEnabled()) { + if (httpOperation != null) { + logger.debug("HttpRequestFailure: " + httpOperation.toString(), ex); + } else { + logger.debug("HttpRequestFailure: " + method + "," + url, ex); + } + } + if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) { + throw new InvalidAbfsRestOperationException(ex); + } + return false; + } + + if (logger.isDebugEnabled()) { + logger.debug("HttpRequest: " + httpOperation.toString()); + } + + if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) { + return false; + } + + result = httpOperation; + + return true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java new file mode 100644 index 0000000..1cbf6b5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.HashMap; +import java.util.Map; + +import com.google.inject.AbstractModule; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory; +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService; +import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; +import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService; + +/** + * This class is responsible to configure all the services used by Azure Blob File System. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class AbfsServiceInjectorImpl extends AbstractModule { + private final Configuration configuration; + private final Map<Class, Class> providers; + private final Map<Class, Object> instances; + + AbfsServiceInjectorImpl(Configuration configuration) { + this.providers = new HashMap<>(); + this.instances = new HashMap<>(); + this.configuration = configuration; + + this.instances.put(Configuration.class, this.configuration); + + this.providers.put(ConfigurationService.class, ConfigurationServiceImpl.class); + + this.providers.put(AbfsHttpService.class, AbfsHttpServiceImpl.class); + this.providers.put(AbfsHttpClientFactory.class, AbfsHttpClientFactoryImpl.class); + + this.providers.put(TracingService.class, TracingServiceImpl.class); + } + + @Override + protected void configure() { + for (Map.Entry<Class, Object> entrySet : this.instances.entrySet()) { + bind(entrySet.getKey()).toInstance(entrySet.getValue()); + } + + for (Map.Entry<Class, Class> entrySet : this.providers.entrySet()) { + bind(entrySet.getKey()).to(entrySet.getValue()); + } + } + + protected Configuration getConfiguration() { + return this.configuration; + } + + protected Map<Class, Class> getProviders() { + return this.providers; + } + + protected Map<Class, Object> getInstances() { + return this.instances; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java new file mode 100644 index 0000000..8560620 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.inject.Guice; +import com.google.inject.Injector; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ServiceResolutionException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsServiceProvider; +import org.apache.hadoop.fs.azurebfs.contracts.services.InjectableService; + +/** + * Dependency injected Azure Storage services provider. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class AbfsServiceProviderImpl implements AbfsServiceProvider { + private static AbfsServiceProviderImpl abfsServiceProvider; + private final Injector abfsServiceInjector; + + private AbfsServiceProviderImpl(final Configuration configuration) { + this.abfsServiceInjector = Guice.createInjector(new AbfsServiceInjectorImpl(Preconditions.checkNotNull(configuration, "configuration"))); + } + + @VisibleForTesting + private AbfsServiceProviderImpl(final Injector abfsServiceInjector) { + Preconditions.checkNotNull(abfsServiceInjector, "abfsServiceInjector"); + this.abfsServiceInjector = abfsServiceInjector; + } + + /** + * Create an instance or returns existing instance of service provider. + * This method must be marked as synchronized to ensure thread-safety. + * @param configuration hadoop configuration. + * @return AbfsServiceProvider the service provider instance. + */ + public static synchronized AbfsServiceProvider create(final Configuration configuration) { + if (abfsServiceProvider == null) { + abfsServiceProvider = new AbfsServiceProviderImpl(configuration); + } + + return abfsServiceProvider; + } + + /** + * Returns current instance of service provider. + * @return AbfsServiceProvider the service provider instance. + */ + public static AbfsServiceProvider instance() { + return abfsServiceProvider; + } + + @VisibleForTesting + static synchronized AbfsServiceProvider create(Injector serviceInjector) { + abfsServiceProvider = new AbfsServiceProviderImpl(serviceInjector); + return abfsServiceProvider; + } + + /** + * Returns an instance of resolved injectable service by class name. + * The injectable service must be configured first to be resolvable. + * @param clazz the injectable service which is expected to be returned. + * @param <T> The type of injectable service. + * @return T instance + * @throws ServiceResolutionException if the service is not resolvable. + */ + @Override + public <T extends InjectableService> T get(final Class<T> clazz) throws ServiceResolutionException { + try { + return this.abfsServiceInjector.getInstance(clazz); + } catch (Exception ex) { + throw new ServiceResolutionException(clazz.getSimpleName(), ex); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java new file mode 100644 index 0000000..bac66af --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; + +import java.util.HashMap; +import java.util.Map; + +/** + * The UrlQueryBuilder for Rest AbfsClient + */ +public class AbfsUriQueryBuilder { + private Map<String, String> parameters; + + public AbfsUriQueryBuilder() { + this.parameters = new HashMap<>(); + } + + public void addQuery(final String name, final String value) { + if (value != null && !value.isEmpty()) { + this.parameters.put(name, value); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + boolean first = true; + + for (Map.Entry<String, String> entry : parameters.entrySet()) { + if (first) { + sb.append(AbfsHttpConstants.QUESTION_MARK); + first = false; + } else { + sb.append(AbfsHttpConstants.AND_MARK); + } + sb.append(entry.getKey()).append(AbfsHttpConstants.EQUAL).append(entry.getValue()); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java new file mode 100644 index 0000000..568ee5d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.lang.reflect.Field; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; +import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; +import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator; + +@Singleton +@InterfaceAudience.Private +@InterfaceStability.Evolving +class ConfigurationServiceImpl implements ConfigurationService { + private final Configuration configuration; + private final boolean isSecure; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, + MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, + MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, + DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE) + private int writeBufferSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE, + MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, + MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, + DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE) + private int readBufferSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL, + DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL) + private int minBackoffInterval; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL, + DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL) + private int maxBackoffInterval; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL, + DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL) + private int backoffInterval; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES, + MinValue = 0, + DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS) + private int maxIoRetries; + + @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME, + MinValue = 0, + MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE, + DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE) + private long azureBlockSize; + + @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, + DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT) + private String azureBlockLocationHost; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT, + MinValue = 1, + DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS) + private int maxConcurrentWriteThreads; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN, + MinValue = 1, + DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS) + private int maxConcurrentReadThreads; + + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND, + DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND) + private boolean tolerateOobAppends; + + @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY, + DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) + private String azureAtomicDirs; + + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, + DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) + private boolean createRemoteFileSystemDuringInitialization; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, + DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH) + private int readAheadQueueDepth; + + private Map<String, String> storageAccountKeys; + + @Inject + ConfigurationServiceImpl(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException { + this.configuration = configuration; + this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false); + + validateStorageAccountKeys(); + Field[] fields = this.getClass().getDeclaredFields(); + for (Field field : fields) { + field.setAccessible(true); + if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) { + field.set(this, validateInt(field)); + } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) { + field.set(this, validateLong(field)); + } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) { + field.set(this, validateString(field)); + } else if (field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) { + field.set(this, validateBase64String(field)); + } else if (field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) { + field.set(this, validateBoolean(field)); + } + } + } + + @Override + public boolean isEmulator() { + return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false); + } + + @Override + public boolean isSecureMode() { + return this.isSecure; + } + + @Override + public String getStorageAccountKey(final String accountName) throws ConfigurationPropertyNotFoundException { + String accountKey = this.storageAccountKeys.get(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName); + if (accountKey == null) { + throw new ConfigurationPropertyNotFoundException(accountName); + } + + return accountKey; + } + + @Override + public Configuration getConfiguration() { + return this.configuration; + } + + @Override + public int getWriteBufferSize() { + return this.writeBufferSize; + } + + @Override + public int getReadBufferSize() { + return this.readBufferSize; + } + + @Override + public int getMinBackoffIntervalMilliseconds() { + return this.minBackoffInterval; + } + + @Override + public int getMaxBackoffIntervalMilliseconds() { + return this.maxBackoffInterval; + } + + @Override + public int getBackoffIntervalMilliseconds() { + return this.backoffInterval; + } + + @Override + public int getMaxIoRetries() { + return this.maxIoRetries; + } + + @Override + public long getAzureBlockSize() { + return this.azureBlockSize; + } + + @Override + public String getAzureBlockLocationHost() { + return this.azureBlockLocationHost; + } + + @Override + public int getMaxConcurrentWriteThreads() { + return this.maxConcurrentWriteThreads; + } + + @Override + public int getMaxConcurrentReadThreads() { + return this.maxConcurrentReadThreads; + } + + @Override + public boolean getTolerateOobAppends() { + return this.tolerateOobAppends; + } + + @Override + public String getAzureAtomicRenameDirs() { + return this.azureAtomicDirs; + } + + @Override + public boolean getCreateRemoteFileSystemDuringInitialization() { + return this.createRemoteFileSystemDuringInitialization; + } + + @Override + public int getReadAheadQueueDepth() { + return this.readAheadQueueDepth; + } + + void validateStorageAccountKeys() throws InvalidConfigurationValueException { + Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator( + ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true); + this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX); + + for (Map.Entry<String, String> account : this.storageAccountKeys.entrySet()) { + validator.validate(account.getValue()); + } + } + + int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new IntegerConfigurationBasicValidator( + validator.MinValue(), + validator.MaxValue(), + validator.DefaultValue(), + validator.ConfigurationKey(), + validator.ThrowIfInvalid()).validate(value); + } + + long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new LongConfigurationBasicValidator( + validator.MinValue(), + validator.MaxValue(), + validator.DefaultValue(), + validator.ConfigurationKey(), + validator.ThrowIfInvalid()).validate(value); + } + + String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new StringConfigurationBasicValidator( + validator.ConfigurationKey(), + validator.DefaultValue(), + validator.ThrowIfInvalid()).validate(value); + } + + String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class)); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new Base64StringConfigurationBasicValidator( + validator.ConfigurationKey(), + validator.DefaultValue(), + validator.ThrowIfInvalid()).validate(value); + } + + boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new BooleanConfigurationBasicValidator( + validator.ConfigurationKey(), + validator.DefaultValue(), + validator.ThrowIfInvalid()).validate(value); + } + + @VisibleForTesting + void setReadBufferSize(int bufferSize) { + this.readBufferSize = bufferSize; + } + + @VisibleForTesting + void setWriteBufferSize(int bufferSize) { + this.writeBufferSize = bufferSize; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java new file mode 100644 index 0000000..0c92612 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.Random; +import java.net.HttpURLConnection; + +class ExponentialRetryPolicy { + /** + * Represents the default number of retry attempts. + */ + private static final int DEFAULT_CLIENT_RETRY_COUNT = 30; + + /** + * Represents the default amount of time used when calculating a random delta in the exponential + * delay between retries. + */ + private static final int DEFAULT_CLIENT_BACKOFF = 1000 * 3; + + /** + * Represents the default maximum amount of time used when calculating the exponential + * delay between retries. + */ + private static final int DEFAULT_MAX_BACKOFF = 1000 * 30; + + /** + *Represents the default minimum amount of time used when calculating the exponential + * delay between retries. + */ + private static final int DEFAULT_MIN_BACKOFF = 1000 * 3; + + /** + * The minimum random ratio used for delay interval calculation. + */ + private static final double MIN_RANDOM_RATIO = 0.8; + + /** + * The maximum random ratio used for delay interval calculation. + */ + private static final double MAX_RANDOM_RATIO = 1.2; + + /** + * Holds the random number generator used to calculate randomized backoff intervals + */ + private final Random randRef = new Random(); + + /** + * The value that will be used to calculate a random delta in the exponential delay interval + */ + private final int deltaBackoff; + + /** + * The maximum backoff time. + */ + private final int maxBackoff; + + /** + * The minimum backoff time. + */ + private final int minBackoff; + + /** + * The maximum number of retry attempts. + */ + private final int retryCount; + + /** + * Initializes a new instance of the {@link ExponentialRetryPolicy} class. + */ + ExponentialRetryPolicy() { + this(DEFAULT_CLIENT_RETRY_COUNT, DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_CLIENT_BACKOFF); + } + + /** + * Initializes a new instance of the {@link ExponentialRetryPolicy} class. + * + * @param retryCount The maximum number of retry attempts. + * @param minBackoff The minimum backoff time. + * @param maxBackoff The maximum backoff time. + * @param deltaBackoff The value that will be used to calculate a random delta in the exponential delay + * between retries. + */ + ExponentialRetryPolicy(final int retryCount, final int minBackoff, final int maxBackoff, final int deltaBackoff) { + this.retryCount = retryCount; + this.minBackoff = minBackoff; + this.maxBackoff = maxBackoff; + this.deltaBackoff = deltaBackoff; + } + + /** + * Returns if a request should be retried based on the retry count, current response, + * and the current strategy. + * + * @param retryCount The current retry attempt count. + * @param statusCode The status code of the response, or -1 for socket error. + * @return true if the request should be retried; false otherwise. + */ + public boolean shouldRetry(final int retryCount, final int statusCode) { + return retryCount < this.retryCount + && (statusCode == -1 + || statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT + || (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR + && statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED + && statusCode != HttpURLConnection.HTTP_VERSION)); + } + + /** + * Returns backoff interval between 80% and 120% of the desired backoff, + * multiply by 2^n-1 for exponential. + * + * @param retryCount The current retry attempt count. + * @return backoff Interval time + */ + public long getRetryInterval(final int retryCount) { + final long boundedRandDelta = (int) (this.deltaBackoff * MIN_RANDOM_RATIO) + + this.randRef.nextInt((int) (this.deltaBackoff * MAX_RANDOM_RATIO) + - (int) (this.deltaBackoff * MIN_RANDOM_RATIO)); + + final double incrementDelta = (Math.pow(2, retryCount - 1)) * boundedRandDelta; + + final long retryInterval = (int) Math.round(Math.min(this.minBackoff + incrementDelta, maxBackoff)); + + return retryInterval; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java new file mode 100644 index 0000000..99190e6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.htrace.core.HTraceConfiguration; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanReceiver; +import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; +import org.apache.htrace.fasterxml.jackson.databind.ObjectWriter; +import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * LoggerSpanReceiver is a layer between HTrace and log4j only used for {@link org.apache.hadoop.fs.azurebfs.contracts.services.TracingService} + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class LoggerSpanReceiver extends SpanReceiver { + private static final ObjectWriter JSON_WRITER = + new ObjectMapper() + .configure(SerializationFeature.INDENT_OUTPUT, true) + .configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true) + .configure(SerializationFeature.WRITE_EMPTY_JSON_ARRAYS, false) + .configure(SerializationFeature.USE_EQUALITY_FOR_OBJECT_ID, false) + .writer(); + + public LoggerSpanReceiver(HTraceConfiguration hTraceConfiguration) { + Preconditions.checkNotNull(hTraceConfiguration, "hTraceConfiguration"); + } + + @Override + public void receiveSpan(final Span span) { + String jsonValue; + + Logger logger = LoggerFactory.getLogger(AzureBlobFileSystem.class); + + try { + jsonValue = JSON_WRITER.writeValueAsString(span); + logger.trace(jsonValue); + } catch (JsonProcessingException e) { + logger.error("Json processing error: " + e.getMessage()); + } + } + + @Override + public void close() throws IOException { + // No-Op + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java new file mode 100644 index 0000000..1fac13d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; + +import java.util.concurrent.CountDownLatch; + +class ReadBuffer { + + private AbfsInputStream stream; + private long offset; // offset within the file for the buffer + private int length; // actual length, set after the buffer is filles + private int requestedLength; // requested length of the read + private byte[] buffer; // the buffer itself + private int bufferindex = -1; // index in the buffers array in Buffer manager + private ReadBufferStatus status; // status of the buffer + private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client + // waiting on this buffer gets unblocked + + // fields to help with eviction logic + private long timeStamp = 0; // tick at which buffer became available to read + private boolean isFirstByteConsumed = false; + private boolean isLastByteConsumed = false; + private boolean isAnyByteConsumed = false; + + public AbfsInputStream getStream() { + return stream; + } + + public void setStream(AbfsInputStream stream) { + this.stream = stream; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public int getLength() { + return length; + } + + public void setLength(int length) { + this.length = length; + } + + public int getRequestedLength() { + return requestedLength; + } + + public void setRequestedLength(int requestedLength) { + this.requestedLength = requestedLength; + } + + public byte[] getBuffer() { + return buffer; + } + + public void setBuffer(byte[] buffer) { + this.buffer = buffer; + } + + public int getBufferindex() { + return bufferindex; + } + + public void setBufferindex(int bufferindex) { + this.bufferindex = bufferindex; + } + + public ReadBufferStatus getStatus() { + return status; + } + + public void setStatus(ReadBufferStatus status) { + this.status = status; + } + + public CountDownLatch getLatch() { + return latch; + } + + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + public boolean isFirstByteConsumed() { + return isFirstByteConsumed; + } + + public void setFirstByteConsumed(boolean isFirstByteConsumed) { + this.isFirstByteConsumed = isFirstByteConsumed; + } + + public boolean isLastByteConsumed() { + return isLastByteConsumed; + } + + public void setLastByteConsumed(boolean isLastByteConsumed) { + this.isLastByteConsumed = isLastByteConsumed; + } + + public boolean isAnyByteConsumed() { + return isAnyByteConsumed; + } + + public void setAnyByteConsumed(boolean isAnyByteConsumed) { + this.isAnyByteConsumed = isAnyByteConsumed; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java new file mode 100644 index 0000000..164e549 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -0,0 +1,391 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Stack; +import java.util.concurrent.CountDownLatch; + +/** + * The Read Buffer Manager for Rest AbfsClient + */ +final class ReadBufferManager { + private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); + + private static final int NUM_BUFFERS = 16; + private static final int BLOCK_SIZE = 4 * 1024 * 1024; + private static final int NUM_THREADS = 8; + private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + + private Thread[] threads = new Thread[NUM_THREADS]; + private byte[][] buffers; // array of byte[] buffers, to hold the data that is read + private Stack<Integer> freeList = new Stack<Integer>(); // indices in buffers[] array that are available + + private Queue<ReadBuffer> readAheadQueue = new LinkedList<ReadBuffer>(); // queue of requests that are not picked up by any worker thread yet + private LinkedList<ReadBuffer> inProgressList = new LinkedList<ReadBuffer>(); // requests being processed by worker threads + private LinkedList<ReadBuffer> completedReadList = new LinkedList<ReadBuffer>(); // buffers available for reading + private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block + + static { + BUFFER_MANAGER = new ReadBufferManager(); + BUFFER_MANAGER.init(); + } + + static ReadBufferManager getBufferManager() { + return BUFFER_MANAGER; + } + + private void init() { + buffers = new byte[NUM_BUFFERS][]; + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC + freeList.add(i); + } + for (int i = 0; i < NUM_THREADS; i++) { + Thread t = new Thread(new ReadBufferWorker(i)); + t.setDaemon(true); + threads[i] = t; + t.setName("ABFS-prefetch-" + i); + t.start(); + } + ReadBufferWorker.UNLEASH_WORKERS.countDown(); + } + + // hide instance constructor + private ReadBufferManager() { + } + + + /* + * + * AbfsInputStream-facing methods + * + */ + + + /** + * {@link AbfsInputStream} calls this method to queue read-aheads + * + * @param stream The {@link AbfsInputStream} for which to do the read-ahead + * @param requestedOffset The offset in the file which shoukd be read + * @param requestedLength The length to read + */ + void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Start Queueing readAhead for " + stream.getPath() + " offset " + requestedOffset + + " length " + requestedLength); + } + ReadBuffer buffer; + synchronized (this) { + if (isAlreadyQueued(stream, requestedOffset)) { + return; // already queued, do not queue again + } + if (freeList.size() == 0 && !tryEvict()) { + return; // no buffers available, cannot queue anything + } + + buffer = new ReadBuffer(); + buffer.setStream(stream); + buffer.setOffset(requestedOffset); + buffer.setLength(0); + buffer.setRequestedLength(requestedLength); + buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); + buffer.setLatch(new CountDownLatch(1)); + + Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already + + buffer.setBuffer(buffers[bufferIndex]); + buffer.setBufferindex(bufferIndex); + readAheadQueue.add(buffer); + notifyAll(); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done q-ing readAhead for file " + stream.getPath() + " offset " + requestedOffset + + " buffer idx " + buffer.getBufferindex()); + } + } + + + /** + * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a + * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading + * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead + * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because + * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own + * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). + * + * @param stream the file to read bytes for + * @param position the offset in the file to do a read for + * @param length the length to read + * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. + * @return the number of bytes read + */ + int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) { + // not synchronized, so have to be careful with locking + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("getBlock for file " + stream.getPath() + " position " + position + " thread " + Thread.currentThread().getName()); + } + + waitForProcess(stream, position); + + int bytesRead = 0; + synchronized (this) { + bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); + } + if (bytesRead > 0) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done read from Cache for " + stream.getPath() + " position " + position + " length " + bytesRead); + } + return bytesRead; + } + + // otherwise, just say we got nothing - calling thread can do it's own read + return 0; + } + + /* + * + * Internal methods + * + */ + + private void waitForProcess(final AbfsInputStream stream, final long position) { + ReadBuffer readBuf; + synchronized (this) { + clearFromReadAheadQueue(stream, position); + readBuf = getFromList(inProgressList, stream, position); + } + if (readBuf != null) { // if in in-progress queue, then block for it + try { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("got a relevant read buffer for file " + stream.getPath() + " offset " + readBuf.getOffset() + + " buffer idx " + readBuf.getBufferindex()); + } + readBuf.getLatch().await(); // blocking wait on the caller stream's thread + // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread + // is done processing it (in doneReading). There, the latch is set after removing the buffer from + // inProgressList. So this latch is safe to be outside the synchronized block. + // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock + // while waiting, so no one will be able to change any state. If this becomes more complex in the future, + // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("latch done for file " + stream.getPath() + " buffer idx " + readBuf.getBufferindex() + + " length " + readBuf.getLength()); + } + } + } + + /** + * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. + * The objective is to find just one buffer - there is no advantage to evicting more than one. + * + * @return whether the eviction succeeeded - i.e., were we able to free up one buffer + */ + private synchronized boolean tryEvict() { + ReadBuffer nodeToEvict = null; + if (completedReadList.size() <= 0) { + return false; // there are no evict-able buffers + } + + // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) + for (ReadBuffer buf : completedReadList) { + if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { + nodeToEvict = buf; + break; + } + } + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) + for (ReadBuffer buf : completedReadList) { + if (buf.isAnyByteConsumed()) { + nodeToEvict = buf; + break; + } + } + + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try any old nodes that have not been consumed + long earliestBirthday = Long.MAX_VALUE; + for (ReadBuffer buf : completedReadList) { + if (buf.getTimeStamp() < earliestBirthday) { + nodeToEvict = buf; + earliestBirthday = buf.getTimeStamp(); + } + } + if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { + return evict(nodeToEvict); + } + + // nothing can be evicted + return false; + } + + private boolean evict(final ReadBuffer buf) { + freeList.push(buf.getBufferindex()); + completedReadList.remove(buf); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Evicting buffer idx " + buf.getBufferindex() + "; was used for file " + buf.getStream().getPath() + + " offset " + buf.getOffset() + " length " + buf.getLength()); + } + return true; + } + + private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) { + // returns true if any part of the buffer is already queued + return (isInList(readAheadQueue, stream, requestedOffset) + || isInList(inProgressList, stream, requestedOffset) + || isInList(completedReadList, stream, requestedOffset)); + } + + private boolean isInList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) { + return (getFromList(list, stream, requestedOffset) != null); + } + + private ReadBuffer getFromList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) { + for (ReadBuffer buffer : list) { + if (buffer.getStream() == stream) { + if (buffer.getStatus() == ReadBufferStatus.AVAILABLE + && requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getLength()) { + return buffer; + } else if (requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { + return buffer; + } + } + } + return null; + } + + private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { + ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); + if (buffer != null) { + readAheadQueue.remove(buffer); + notifyAll(); // lock is held in calling method + freeList.push(buffer.getBufferindex()); + } + } + + private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, + final byte[] buffer) { + ReadBuffer buf = getFromList(completedReadList, stream, position); + if (buf == null || position >= buf.getOffset() + buf.getLength()) { + return 0; + } + int cursor = (int) (position - buf.getOffset()); + int availableLengthInBuffer = buf.getLength() - cursor; + int lengthToCopy = Math.min(length, availableLengthInBuffer); + System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); + if (cursor == 0) { + buf.setFirstByteConsumed(true); + } + if (cursor + lengthToCopy == buf.getLength()) { + buf.setLastByteConsumed(true); + } + buf.setAnyByteConsumed(true); + return lengthToCopy; + } + + /* + * + * ReadBufferWorker-thread-facing methods + * + */ + + /** + * ReadBufferWorker thread calls this to get the next buffer that it should work on. + * + * @return {@link ReadBuffer} + * @throws InterruptedException if thread is interrupted + */ + ReadBuffer getNextBlockToRead() throws InterruptedException { + ReadBuffer buffer = null; + synchronized (this) { + //buffer = readAheadQueue.take(); // blocking method + while (readAheadQueue.size() == 0) { + wait(); + } + buffer = readAheadQueue.remove(); + notifyAll(); + if (buffer == null) { + return null; // should never happen + } + buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); + inProgressList.add(buffer); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker picked file " + buffer.getStream().getPath() + " for offset " + buffer.getOffset()); + } + return buffer; + } + + /** + * ReadBufferWorker thread calls this method to post completion + * + * @param buffer the buffer whose read was completed + * @param result the {@link ReadBufferStatus} after the read operation in the worker thread + * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read + */ + void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker completed file " + buffer.getStream().getPath() + " for offset " + buffer.getOffset() + + " bytes " + bytesActuallyRead); + } + synchronized (this) { + inProgressList.remove(buffer); + if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { + buffer.setStatus(ReadBufferStatus.AVAILABLE); + buffer.setTimeStamp(currentTimeMillis()); + buffer.setLength(bytesActuallyRead); + completedReadList.add(buffer); + } else { + freeList.push(buffer.getBufferindex()); + // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC + } + } + //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results + buffer.getLatch().countDown(); // wake up waiting threads (if any) + } + + /** + * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). + * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), + * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing, + * so it is much more suitable to measuring intervals. + * + * @return current time in milliseconds + */ + private long currentTimeMillis() { + return System.nanoTime() / 1000 / 1000; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java new file mode 100644 index 0000000..2d0c96e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; + +import java.util.concurrent.CountDownLatch; + +class ReadBufferWorker implements Runnable { + + protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); + private int id; + + ReadBufferWorker(final int id) { + this.id = id; + } + + /** + * return the ID of ReadBufferWorker. + */ + public int getId() { + return this.id; + } + + /** + * Waits until a buffer becomes available in ReadAheadQueue. + * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager. + * Rinse and repeat. Forever. + */ + public void run() { + try { + UNLEASH_WORKERS.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + ReadBuffer buffer; + while (true) { + try { + buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + if (buffer != null) { + try { + // do the actual read, from the file. + int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); + bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager + } catch (Exception ex) { + bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org