cameronlee314 commented on a change in pull request #1239: SAMZA-2421: Add SystemProducer for Azure Blob Storage URL: https://github.com/apache/samza/pull/1239#discussion_r360972241
########## File path: samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.azureblob.avro; + +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; +import com.google.common.annotations.VisibleForTesting; +import org.apache.samza.system.azureblob.compression.Compression; +import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +/** + * This class extends {@link java.io.OutputStream} and uses {@link java.io.ByteArrayOutputStream} + * for caching the write calls till upload is not called. + * + * It asynchronously uploads the blocks and waits on them to finish at close. + * The blob is persisted at close. + * + * flush must be explicitly called before close. + * Any writes after a flush and before a close will be lost if no flush is called just before close. + * Once closed this object can not be used. + * + * releaseBuffer releases the underlying buffer i.e ByteArrayOutputStream which holds the data written until it is flushed. + * flush must be explicitly called prior to releaseBuffer else all data written + * since the beginning/previous flush will be lost. + * No data can be written after releaseBuffer, flush after releaseBuffer is a no-op + * and close must still be invoked to wait for all pending uploads to finish and persist the blob. + * releaseBuffer is optional and maybe called after its last flush and before close (which might happen much later), + * so as to reduce the overall memory footprint. close can not replace releaseBuffer as it is a blocking call. + * + * This library is thread safe. + */ +public class AzureBlobOutputStream extends OutputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AzureBlobOutputStream.class); + private static final int MAX_ATTEMPT = 3; + private static final int MAX_BLOCKS_IN_AZURE_BLOB = 50000; + public static final String BLOB_RAW_SIZE_BYTES_METADATA = "rawSizeBytes"; + private final long flushTimeoutMs; + private final BlockBlobAsyncClient blobAsyncClient; + private final Executor blobThreadPool; + private Optional<ByteArrayOutputStream> byteArrayOutputStream; Review comment: Should this be `volatile`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
