cameronlee314 commented on a change in pull request #1239: SAMZA-2421: Add 
SystemProducer for Azure Blob Storage
URL: https://github.com/apache/samza/pull/1239#discussion_r360975885
 
 

 ##########
 File path: 
samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
 ##########
 @@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.avro;
+
+import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.system.azureblob.compression.Compression;
+import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+/**
+ * This class extends {@link java.io.OutputStream} and uses {@link 
java.io.ByteArrayOutputStream}
+ * for caching the write calls till upload is not called.
+ *
+ * It asynchronously uploads the blocks and waits on them to finish at close.
+ * The blob is persisted at close.
+ *
+ * flush must be explicitly called before close.
+ * Any writes after a flush and before a close will be lost if no flush is 
called just before close.
+ * Once closed this object can not be used.
+ *
+ * releaseBuffer releases the underlying buffer i.e ByteArrayOutputStream 
which holds the data written until it is flushed.
+ * flush must be explicitly called prior to releaseBuffer else all data written
+ * since the beginning/previous flush will be lost.
+ * No data can be written after releaseBuffer, flush after releaseBuffer is a 
no-op
+ * and close must still be invoked to wait for all pending uploads to finish 
and persist the blob.
+ * releaseBuffer is optional and maybe called after its last flush and before 
close (which might happen much later),
+  * so as to reduce the overall memory footprint. close can not replace 
releaseBuffer as it is a blocking call.
+ *
+ * This library is thread safe.
+ */
+public class AzureBlobOutputStream extends OutputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AzureBlobOutputStream.class);
+  private static final int MAX_ATTEMPT = 3;
+  private static final int MAX_BLOCKS_IN_AZURE_BLOB = 50000;
+  public static final String BLOB_RAW_SIZE_BYTES_METADATA = "rawSizeBytes";
+  private final long flushTimeoutMs;
+  private final BlockBlobAsyncClient blobAsyncClient;
+  private final Executor blobThreadPool;
+  private Optional<ByteArrayOutputStream> byteArrayOutputStream;
+  // All the block Names should be explicitly present in the blockList during 
CommitBlockList,
+  // even if stageBlock is a blocking call.
+  private final ArrayList<String> blockList;
+  private final Set<CompletableFuture<Void>> pendingUpload = 
ConcurrentHashMap.newKeySet();
+  private final int maxBlockFlushThresholdSize;
+  private final AzureBlobWriterMetrics metrics;
+  private final Compression compression;
+
+  private volatile boolean isClosed = false;
+  private long totalUploadedBlockSize = 0;
+  private int blockNum;
+
+  public AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor 
blobThreadPool, AzureBlobWriterMetrics metrics,
 
 Review comment:
   This looks pretty similar to your VisibleForTesting constructor. Consider 
using the VisibleForTesting constructor here: i.e. `this(..., ..., ...)`
   It will help to ensure consistency between the constructors.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to