cameronlee314 commented on a change in pull request #1239: SAMZA-2421: Add 
SystemProducer for Azure Blob Storage
URL: https://github.com/apache/samza/pull/1239#discussion_r361019827
 
 

 ##########
 File path: 
samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
 ##########
 @@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob.avro;
+
+import com.azure.storage.blob.BlobContainerAsyncClient;
+import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.samza.system.azureblob.compression.Compression;
+import org.apache.samza.system.azureblob.compression.GzipCompression;
+import org.apache.samza.system.azureblob.producer.AzureBlobWriter;
+import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements {@link 
org.apache.samza.system.azureblob.producer.AzureBlobWriter}
+ * for writing avro records to Azure Blob Storage.
+ *
+ * It uses {@link org.apache.avro.file.DataFileWriter} to convert avro records 
it receives to byte[].
+ * This byte[] is passed on to {@link 
org.apache.samza.system.azureblob.avro.AzureBlobOutputStream}.
+ * AzureBlobOutputStream in turn uploads data to Storage as a blob.
+ *
+ * It also accepts encoded records as byte[] as long as the first 
OutgoingMessageEnvelope this writer receives
+ * is a decoded record from which to get the schema and record type 
(GenericRecord vs SpecificRecord).
+ * The subsequent encoded records are written directly to 
AzureBlobOutputStream without checking if they conform
+ * to the schema. It is the responsibility of the user to ensure this. Failing 
to do so may result in an
+ * unreadable avro blob.
+ *
+ * It expects all OutgoingMessageEnvelopes to be of the same schema.
+ * To handle schema evolution (sending envelopes of different schema), this 
writer has to be closed and a new writer
+ * has to be created. The first envelope of the new writer should contain a 
valid record to get schema from.
+ * If used by AzureBlobSystemProducer, this is done through 
systemProducer.flush(source).
+ *
+ * Once closed this object can not be used.
+ * This is a thread safe class.
+ *
+ * If the number of records or size of the current blob exceeds the specified 
limits then a new blob is created.
+ */
+public class AzureBlobAvroWriter implements AzureBlobWriter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AzureBlobAvroWriter.class);
+  private static final String PUBLISHED_FILE_NAME_DATE_FORMAT = 
"yyyy/MM/dd/HH/mm-ss";
+  private static final String BLOB_NAME_AVRO = "%s/%s-%s.avro%s";
+  private static final String BLOB_NAME_RANDOM_STRING_AVRO = 
"%s/%s-%s-%s.avro%s";
+  private static final SimpleDateFormat UTC_FORMATTER = buildUTCFormatter();
+
+  // Avro's DataFileWriter has internal buffers and also adds metadata.
+  // Based on the current default sizes of these buffers and metadata, the 
data overhead is a little less than 100KB
+  // However, taking the overhead to be capped at 1MB to ensure enough room if 
the default values are increased.
+  static final long DATAFILEWRITER_OVERHEAD = 1000000; // 1MB
+
+  // currentBlobWriterComponents == null only for the first blob immediately 
after this AzureBlobAvroWriter object has been created.
+  // rest of this object's lifecycle, currentBlobWriterComponents is not null.
+  private BlobWriterComponents currentBlobWriterComponents = null;
+  private final List<BlobWriterComponents> allBlobWriterComponents = new 
ArrayList<>();
+  private Schema schema = null;
+  // datumWriter == null only for the first blob immediately after this 
AzureBlobAvroWriter object has been created.
+  // It is created from the schema taken from the first 
OutgoingMessageEnvelope. Hence the first OME has to be a decoded avro record.
+  // For rest of this object's lifecycle, datumWriter is not null.
+  private DatumWriter<IndexedRecord> datumWriter = null;
 
 Review comment:
   Ah yes, if you are protecting all access (read and write) with locks (which 
implement `Lock`), then looks like you are good.
   
https://stackoverflow.com/questions/1570589/is-the-volatile-keyword-required-for-fields-accessed-via-a-reentrantlock
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to