cameronlee314 commented on a change in pull request #1239: SAMZA-2421: Add SystemProducer for Azure Blob Storage URL: https://github.com/apache/samza/pull/1239#discussion_r360993457
########## File path: samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java ########## @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.azureblob.producer; + +import com.azure.core.http.HttpClient; +import com.azure.core.http.HttpResponse; +import com.azure.core.http.ProxyOptions; +import com.azure.core.http.netty.NettyAsyncHttpClientBuilder; +import com.azure.core.http.policy.HttpLogDetailLevel; +import com.azure.core.http.policy.HttpLogOptions; +import com.azure.core.util.Configuration; +import com.azure.storage.blob.BlobContainerAsyncClient; +import com.azure.storage.blob.BlobServiceAsyncClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.SkuName; +import com.azure.storage.common.StorageSharedKeyCredential; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.samza.system.azureblob.AzureBlobConfig; +import org.apache.samza.system.azureblob.compression.CompressionFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.system.SystemProducerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * AzureBlob system producer to send messages to Azure Blob Storage. + * This system producer is thread safe. + * For different sources: sends/flushes can happen in parallel. + * For same source: It supports sends in parallel. flushes are exclusive. + * + * + * Azure Blob Storage has a 3 level hierarchy: an Azure account contains multiple containers (akin to directories + * in a file system) and each container has multiple blobs (akin to files). + * + * Azure Container: System name maps to the name of Azure container. + * An instance of a system producer writes to a single Azure container considering the container as a system. + * + * Azure Blob: For a given stream-partition pair, a blob is created with name stream/partition/timestamp-randomString. + * The stream and partition are extracted from the SSP of OutgoingMessageEnvelope in send(). + * Blob is started when the first message for that stream-partition is sent by a source + * and closed during flush for that source. + * Subsequent sends by the source to the same stream-partition will create a new blob with a different timestamp. + * Thus, timestamp corresponds to writer creation time i.e; the first send for source-SSP + * or first send after a flush for the source. + * If max blob size or record limit are configured, then a new blob is started when limits exceed. + * + * A random string is used as a suffix in the blob name to prevent collisions: + * - if two system producers are writing to the same SSP. + * - if two sources send to the same SSP. + * + * Lifecycle of the system producer is shown below. All sources have to be registered before starting the producer. + * Several messages can be sent by a source via send(source, envelope). This can be followed by a flush(source) or stop() + * After flush(source), more messages can be sent for that source and other sources as well. stop() internally calls + * flush(source) for all the sources registered. After stop(), no calls to send and flush are allowed. + * + * + * ┌──────────────────────────────┐ + * │ │ + * ▼ │ + * Lifecycle: register(source) ────────▶ start() ──────▶ send(source, envelope) ──────▶ flush(source) ──────▶ stop() + * [multiple times │ ▲ │ ▲ + * for └────┘ └─────────────────────────────────────┘ + * multiple sources] + * + * This SystemProducer does not open up the envelopes sent through it. It is the responsibility of the user of this + * SystemProducer to ensure the envelopes are valid and a correct writer has been chosen by wiring up the + * writer factory config. + * + */ +public class AzureBlobSystemProducer implements SystemProducer { + + private static final Logger LOG = LoggerFactory.getLogger(AzureBlobSystemProducer.class.getName()); + + private static final String BLOB_NAME_PREFIX = "%s"; + private static final String BLOB_NAME_PARTITION_PREFIX = "%s/%s"; + + private static final String AZURE_URL = "https://%s.blob.core.windows.net"; + + private static final int PREMIUM_MAX_BLOCK_SIZE = 100 * 1024 * 1024; // 100MB + private static final int STANDARD_MAX_BLOCK_SIZE = 4 * 1024 * 1024; // 4MB + + private BlobContainerAsyncClient containerAsyncClient; + private final String systemName; + private final AzureBlobConfig config; + + // Map of writers indexed first by sourceName and then by (streamName, partitionName) or just streamName if partition key does not exist. + private Map<String, Map<String, AzureBlobWriter>> writerMap; + private final AzureBlobWriterFactory writerFactory; + + private final int blockFlushThresholdSize; + private final long flushTimeoutMs; + private final long closeTimeout; + private final ThreadPoolExecutor asyncBlobThreadPool; + + private volatile boolean isStarted = false; + private volatile boolean isStopped = false; + + private final AzureBlobSystemProducerMetrics metrics; + + private final Map<String, Object> sourceWriterCreationLockMap = new ConcurrentHashMap<>(); + private final Map<String, ReadWriteLock> sourceSendFlushLockMap = new ConcurrentHashMap<>(); + + public AzureBlobSystemProducer(String systemName, AzureBlobConfig config, MetricsRegistry metricsRegistry) { + Preconditions.checkNotNull(systemName, "System name can not be null when creating AzureBlobSystemProducer"); + Preconditions.checkNotNull(config, "Config can not be null when creating AzureBlobSystemProducer"); + Preconditions.checkNotNull(metricsRegistry, "Metrics registry can not be null when creating AzureBlobSystemProducer"); + + // Azure logs do not show without this property set + System.setProperty(Configuration.PROPERTY_AZURE_LOG_LEVEL, "1"); + this.systemName = systemName; + this.config = config; + + String writerFactoryClassName = this.config.getAzureBlobWriterFactoryClassName(this.systemName); + try { + this.writerFactory = (AzureBlobWriterFactory) Class.forName(writerFactoryClassName).newInstance(); + } catch (Exception e) { + throw new SystemProducerException("Could not create writer factory with name " + writerFactoryClassName, e); + } + this.flushTimeoutMs = this.config.getFlushTimeoutMs(this.systemName); + this.closeTimeout = this.config.getCloseTimeoutMs(this.systemName); + this.blockFlushThresholdSize = this.config.getMaxFlushThresholdSize(this.systemName); + int asyncBlobThreadPoolCount = this.config.getAzureBlobThreadPoolCount(this.systemName); + int blockingQueueSize = this.config.getBlockingQueueSizeOrDefault(this.systemName, asyncBlobThreadPoolCount * 2); + + LOG.info("SystemName: {} block flush size:{}", systemName, this.blockFlushThresholdSize); + LOG.info("SystemName: {} thread count:{}", systemName, asyncBlobThreadPoolCount); + + BlockingQueue<Runnable> + linkedBlockingDeque = new LinkedBlockingDeque<>(blockingQueueSize); + + this.asyncBlobThreadPool = + new ThreadPoolExecutor(asyncBlobThreadPoolCount, asyncBlobThreadPoolCount, 60, + TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy()); + + this.writerMap = new ConcurrentHashMap<>(); + + this.metrics = new AzureBlobSystemProducerMetrics(systemName, config.getAzureAccountName(systemName), metricsRegistry); + } + + /** + * {@inheritDoc} + * @throws SystemProducerException + */ + @Override + public synchronized void start() { + if (isStarted) { + throw new SystemProducerException("Attempting to start an already started producer."); + } + + String accountName = config.getAzureAccountName(systemName); + String accountKey = config.getAzureAccountKey(systemName); + + setupAzureContainer(accountName, accountKey); + + LOG.info("Starting producer."); + isStarted = true; + } + + /** + * {@inheritDoc} + * @throws SystemProducerException + */ + @Override + public synchronized void stop() { + if (!isStarted) { + throw new SystemProducerException("Attempting to stop a producer that was not started."); + } + + if (isStopped) { + throw new SystemProducerException("Attempting to stop an already stopped producer."); + } + + try { + writerMap.forEach((source, sourceWriterMap) -> flush(source)); + asyncBlobThreadPool.shutdown(); + isStarted = false; + } catch (Exception e) { + throw new SystemProducerException("Stop failed with exception.", e); + } finally { + writerMap.clear(); + isStopped = true; + } + } + + /** + * {@inheritDoc} + * @throws SystemProducerException + */ + @Override + public synchronized void register(String source) { + LOG.info("Registering source {}", source); + if (isStarted) { + throw new SystemProducerException("Cannot register once the producer is started."); + } + if (writerMap.containsKey(source)) { + // source already registered => writerMap and metrics have entries for the source + LOG.warn("Source: {} already registered", source); + return; + } + writerMap.put(source, new ConcurrentHashMap<>()); + sourceWriterCreationLockMap.put(source, new Object()); + sourceSendFlushLockMap.put(source, new ReentrantReadWriteLock()); + metrics.register(source); + } + + /** + * Multi-threading and thread-safety: + * + * From Samza usage of SystemProducer: + * The lifecycle of SystemProducer shown above is consistent with most use cases within Samza (with the exception of + * Coordinator stream store/producer and KafkaCheckpointManager). + * A single parent thread creates the SystemProducer, registers all sources and starts it before handing it + * to multiple threads for use (send and flush). Finally, the single parent thread stops the producer. + * The most frequent operations on a SystemProducer are send and flush while register, start and stop are one-time operations. + * + * Based on this usage pattern: to provide multi-threaded support and improve throughput of this SystemProducer, + * multiple sends and flushes need to happen in parallel. However, the following rules are needed to ensure + * o data loss and data consistency. + * 1. sends can happen in parallel for same source or different sources. + * 2. send and flush for the same source can not happen in parallel. Although, the AzureBlobWriter is thread safe, + * interleaving write and flush and close operations of a writer can lead to data loss if a write happens between flush and close. + * There are other scenarios such as issuing a write to the writer after close and so on. + * 3. writer creation for the same writer key (SSP) can not happen in parallel - for the reason that multiple + * writers could get created with only one being retained but all being used and GCed after a send, leading to data loss. + * + * These 3 rules are achieved by using a per source ReadWriteLock to allow sends in parallel but guarantee exclusivity for flush. + * Additionally, a per source lock is used to ensure writer creation is in a critical section. + * + * Concurrent access to shared objects as follows: + * 1. AzureBlobWriters is permitted as long as there are no interleaving of operations for a writer. + * If multiple operations of writer (as in flush) then make it synchronized. + * 2. ConcurrentHashMaps (esp writerMap per source) get and put - disallow interleaving by doing put and clear under locks. + * 3. WriterFactory and Metrics are thread-safe. WriterFactory is stateless while Metrics' operations interleaving + * is thread-safe too as they work on different counters. + * The above locking mechanisms ensure thread-safety. + * {@inheritDoc} + * @throws SystemProducerException + */ + @Override + public void send(String source, OutgoingMessageEnvelope messageEnvelope) { + if (!isStarted) { + throw new SystemProducerException("Trying to send before producer has started."); + } + ReadWriteLock lock = sourceSendFlushLockMap.get(source); + if (lock == null) { + throw new SystemProducerException("Attempting to send to source: " + source + " but it was not registered"); + } + lock.readLock().lock(); + try { + AzureBlobWriter writer = getOrCreateWriter(source, messageEnvelope); + writer.write(messageEnvelope); + metrics.updateWriteMetrics(source); + } catch (Exception e) { + metrics.updateErrorMetrics(source); + Object partitionKey = getPartitionKey(messageEnvelope); + String msg = "Send failed for source: " + source + ", system: " + systemName + + ", stream: " + messageEnvelope.getSystemStream().getStream() + + ", partitionKey: " + ((partitionKey != null) ? partitionKey : "null"); + throw new SystemProducerException(msg, e); + } finally { + lock.readLock().unlock(); + } + } + + /** + * {@inheritDoc} + * @throws SystemProducerException + */ + @Override + public void flush(String source) { + if (!isStarted) { + throw new SystemProducerException("Trying to flush before producer has started."); + } + ReadWriteLock lock = sourceSendFlushLockMap.get(source); + if (lock == null) { + throw new SystemProducerException("Attempting to flush source: " + source + " but it was not registered"); + } + lock.writeLock().lock(); + Map<String, AzureBlobWriter> sourceWriterMap = writerMap.get(source); + try { + // first flush all the writers + // then close and remove all the writers + flushWriters(sourceWriterMap); + closeWriters(source, sourceWriterMap); + } catch (Exception e) { + metrics.updateErrorMetrics(source); + throw new SystemProducerException("Flush failed for system:" + systemName + " and source: " + source, e); + } finally { + sourceWriterMap.clear(); + lock.writeLock().unlock(); + } + } + + @VisibleForTesting + void setupAzureContainer(String accountName, String accountKey) { + try { + // Use your Azure Blob Storage account's name and key to create a credential object to access your account. + StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey); + + HttpClient httpClient; + if (config.getUseProxy(systemName)) { + LOG.info("HTTP Proxy setup for AzureBlob pipeline"); + httpClient = new NettyAsyncHttpClientBuilder() + .proxy(new ProxyOptions(ProxyOptions.Type.HTTP, + new InetSocketAddress(config.getAzureProxyHostname(systemName), config.getAzureProxyPort(systemName)))).build(); + } else { + httpClient = HttpClient.createDefault(); + } + + // From the Azure portal, get your Storage account blob service AsyncClient endpoint. + String endpoint = String.format(Locale.ROOT, AZURE_URL, accountName); + + HttpLogOptions httpLogOptions = new HttpLogOptions(); + httpLogOptions.setLogLevel(HttpLogDetailLevel.BASIC); + BlobServiceAsyncClient storageClient = + new BlobServiceClientBuilder() + .httpLogOptions(httpLogOptions) + .endpoint(endpoint) + .credential(credential) + .httpClient(httpClient) + .buildAsyncClient(); + + + SkuName accountType = storageClient.getAccountInfo().block().getSkuName(); + long flushThresholdSize = config.getMaxFlushThresholdSize(systemName); + boolean isPremiumAccount = SkuName.PREMIUM_LRS == accountType; + if (isPremiumAccount && flushThresholdSize > PREMIUM_MAX_BLOCK_SIZE) { // 100 MB + throw new SystemProducerException("Azure storage account with name: " + accountName + + " is a premium account and can only handle upto 100MB threshold size. Given flush threshold size is " + + flushThresholdSize); + } else if (!isPremiumAccount && flushThresholdSize > STANDARD_MAX_BLOCK_SIZE) { // STANDARD account + throw new SystemProducerException("Azure storage account with name: " + accountName + + " is a standard account and can only handle upto 4MB threshold size. Given flush threshold size is " + + flushThresholdSize); + } + + containerAsyncClient = storageClient.getBlobContainerAsyncClient(systemName); + + // Only way to check if container exists or not is by creating it and look for failure/success. + createContainerIfNotExists(containerAsyncClient); + } catch (Exception e) { + metrics.updateAzureContainerMetrics(); + throw new SystemProducerException("Failed to set up Azure container for SystemName: " + systemName, e); + } + } + + /** + * // find the writer in the writerMap else create one + * @param source for which to find/create the writer + * @param messageEnvelope to fetch the schema from if writer needs to be created + * @return an AzureBlobWriter object + */ + @VisibleForTesting + AzureBlobWriter getOrCreateWriter(String source, OutgoingMessageEnvelope messageEnvelope) { + String writerMapKey; + String blobURLPrefix; + String partitionKey = getPartitionKey(messageEnvelope); + // using most significant bits in UUID (8 digits) to avoid collision in blob names + if (partitionKey == null) { + writerMapKey = messageEnvelope.getSystemStream().getStream(); + blobURLPrefix = String.format(BLOB_NAME_PREFIX, messageEnvelope.getSystemStream().getStream()); + } else { + writerMapKey = messageEnvelope.getSystemStream().getStream() + "/" + partitionKey; + blobURLPrefix = String.format(BLOB_NAME_PARTITION_PREFIX, messageEnvelope.getSystemStream().getStream(), partitionKey); + } + Map<String, AzureBlobWriter> sourceWriterMap = writerMap.get(source); + if (sourceWriterMap == null) { + throw new SystemProducerException("Attempting to send to source: " + source + " but it is not registered"); + } + AzureBlobWriter writer = sourceWriterMap.get(writerMapKey); + if (writer == null) { + synchronized (sourceWriterCreationLockMap.get(source)) { + writer = sourceWriterMap.get(writerMapKey); + if (writer == null) { + AzureBlobWriterMetrics writerMetrics = + new AzureBlobWriterMetrics(metrics.getAggregateMetrics(), metrics.getSystemMetrics(), metrics.getSourceMetrics(source)); + writer = createNewWriter(blobURLPrefix, writerMetrics); + sourceWriterMap.put(writerMapKey, writer); + } + } + } + return writer; + } + + private void createContainerIfNotExists(BlobContainerAsyncClient containerClient) { + try { + containerClient.create().block(); + } catch (BlobStorageException e) { + //StorageErrorCode defines constants corresponding to all error codes returned by the service. + if (e.getErrorCode() == BlobErrorCode.RESOURCE_NOT_FOUND) { + HttpResponse response = e.getResponse(); + LOG.error("Error creating the container url " + containerClient.getBlobContainerUrl().toString() + " with status code: " + response.getStatusCode(), e); + } else if (e.getErrorCode() == BlobErrorCode.CONTAINER_BEING_DELETED) { + LOG.error("Container is being deleted. Container URL is: " + containerClient.getBlobContainerUrl().toString(), e); + } else if (e.getErrorCode() == BlobErrorCode.CONTAINER_ALREADY_EXISTS) { + return; + } + throw e; + } + } + + private String getPartitionKey(OutgoingMessageEnvelope messageEnvelope) { + Object partitionKey = messageEnvelope.getPartitionKey(); + if (partitionKey == null || !(partitionKey instanceof String)) { + return null; + } + return (String) partitionKey; + } + + private void flushWriters(Map<String, AzureBlobWriter> sourceWriterMap) { + sourceWriterMap.forEach((stream, writer) -> { + try { + LOG.info("Flushing topic:{}", stream); + writer.flush(); + } catch (IOException e) { + throw new SystemProducerException("Close failed for topic " + stream, e); + } + }); + } + + private void closeWriters(String source, Map<String, AzureBlobWriter> sourceWriterMap) throws Exception { + Set<CompletableFuture<Void>> pendingClose = ConcurrentHashMap.newKeySet(); + try { + sourceWriterMap.forEach((stream, writer) -> { + LOG.info("Closing topic:{}", stream); + CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() { + @Override + public void run() { + try { + writer.close(); + } catch (IOException e) { + throw new SystemProducerException("Close failed for topic " + stream, e); + } + } + }, asyncBlobThreadPool); + pendingClose.add(future); + future.handle((aVoid, throwable) -> { + sourceWriterMap.remove(writer); + if (throwable != null) { + throw new SystemProducerException("Close failed for topic " + stream, throwable); + } else { + LOG.info("Blob close finished for stream " + stream); + return aVoid; + } + }); + }); + CompletableFuture<Void> future = CompletableFuture.allOf(pendingClose.toArray(new CompletableFuture[0])); + LOG.info("Flush source: {} has pending closes: {} ", source, pendingClose.size()); + future.get((long) closeTimeout, TimeUnit.MILLISECONDS); + } finally { + pendingClose.clear(); + } + } + + @VisibleForTesting + AzureBlobWriter createNewWriter(String blobURL, AzureBlobWriterMetrics writerMetrics) { + try { + return writerFactory.getWriterInstance(containerAsyncClient, blobURL, asyncBlobThreadPool, writerMetrics, + blockFlushThresholdSize, flushTimeoutMs, + new CompressionFactory().getCompression(config.getCompressionType(systemName)), Review comment: Can you make `CompressionFactory` into a singleton? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
