cameronlee314 commented on a change in pull request #1239: SAMZA-2421: Add SystemProducer for Azure Blob Storage URL: https://github.com/apache/samza/pull/1239#discussion_r363415832
########## File path: samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java ########## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.azureblob.avro; + +import com.azure.storage.blob.BlobContainerAsyncClient; +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.samza.system.azureblob.compression.Compression; +import org.apache.samza.system.azureblob.producer.AzureBlobWriter; +import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.Executor; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; +import org.apache.samza.SamzaException; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements {@link org.apache.samza.system.azureblob.producer.AzureBlobWriter} + * for writing avro records to Azure Blob Storage. + * + * It uses {@link org.apache.avro.file.DataFileWriter} to convert avro records it receives to byte[]. + * This byte[] is passed on to {@link org.apache.samza.system.azureblob.avro.AzureBlobOutputStream}. + * AzureBlobOutputStream in turn uploads data to Storage as a blob. + * + * It also accepts encoded records as byte[] as long as the first OutgoingMessageEnvelope this writer receives + * is a decoded record from which to get the schema and record type (GenericRecord vs SpecificRecord). + * The subsequent encoded records are written directly to AzureBlobOutputStream without checking if they conform + * to the schema. It is the responsibility of the user to ensure this. Failing to do so may result in an + * unreadable avro blob. + * + * It expects all OutgoingMessageEnvelopes to be of the same schema. + * To handle schema evolution (sending envelopes of different schema), this writer has to be closed and a new writer + * has to be created. The first envelope of the new writer should contain a valid record to get schema from. + * If used by AzureBlobSystemProducer, this is done through systemProducer.flush(source). + * + * Once closed this object can not be used. + * This is a thread safe class. + * + * If the number of records or size of the current blob exceeds the specified limits then a new blob is created. + */ +public class AzureBlobAvroWriter implements AzureBlobWriter { + private static final Logger LOG = LoggerFactory.getLogger(AzureBlobAvroWriter.class); + private static final String PUBLISHED_FILE_NAME_DATE_FORMAT = "yyyy/MM/dd/HH/mm-ss"; + private static final String BLOB_NAME_AVRO = "%s/%s-%s.avro%s"; + private static final String BLOB_NAME_RANDOM_STRING_AVRO = "%s/%s-%s-%s.avro%s"; + private static final SimpleDateFormat UTC_FORMATTER = buildUTCFormatter(); + + // Avro's DataFileWriter has internal buffers and also adds metadata. + // Based on the current default sizes of these buffers and metadata, the data overhead is a little less than 100KB + // However, taking the overhead to be capped at 1MB to ensure enough room if the default values are increased. + static final long DATAFILEWRITER_OVERHEAD = 1000000; // 1MB + + // currentBlobWriterComponents == null only for the first blob immediately after this AzureBlobAvroWriter object has been created. + // rest of this object's lifecycle, currentBlobWriterComponents is not null. + private BlobWriterComponents currentBlobWriterComponents = null; + private final List<BlobWriterComponents> allBlobWriterComponents = new ArrayList<>(); + private Schema schema = null; + // datumWriter == null only for the first blob immediately after this AzureBlobAvroWriter object has been created. + // It is created from the schema taken from the first OutgoingMessageEnvelope. Hence the first OME has to be a decoded avro record. + // For rest of this object's lifecycle, datumWriter is not null. + private DatumWriter<IndexedRecord> datumWriter = null; + private volatile boolean isClosed = false; + + private final Executor blobThreadPool; + private final AzureBlobWriterMetrics metrics; + private final int maxBlockFlushThresholdSize; + private final long flushTimeoutMs; + private final Compression compression; + private final BlobContainerAsyncClient containerAsyncClient; + private final String blobURLPrefix; + private final long maxBlobSize; + private final long maxRecordsPerBlob; + private final boolean useRandomStringInBlobName; + private final Object currentDataFileWriterLock = new Object(); + private volatile long blobNumber = 0; + private volatile long recordsInCurrentBlob = 0; + + public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix, + Executor blobThreadPool, AzureBlobWriterMetrics metrics, + int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName, + long maxBlobSize, long maxRecordsPerBlob) { + + this.blobThreadPool = blobThreadPool; + this.metrics = metrics; + this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize; + this.flushTimeoutMs = flushTimeoutMs; + this.compression = compression; + this.containerAsyncClient = containerAsyncClient; + this.blobURLPrefix = blobURLPrefix; + this.useRandomStringInBlobName = useRandomStringInBlobName; + this.maxBlobSize = maxBlobSize; + this.maxRecordsPerBlob = maxRecordsPerBlob; + } + + /** + * This method expects the {@link org.apache.samza.system.OutgoingMessageEnvelope} + * to contain a message which is a {@link org.apache.avro.generic.IndexedRecord} or an encoded record aka byte[]. + * If the record is already encoded, it will directly write the byte[] to the output stream without checking if it conforms to schema. + * Else, it encodes the record and writes to output stream. + * However, the first envelope should always be a record and not a byte[]. + * If the blocksize threshold crosses, it will upload the output stream contents as a block. + * If the number of records in current blob or size of current blob exceed limits then a new blob is created. + * Multi-threading and thread-safety: + * The underlying {@link org.apache.avro.file.DataFileWriter} is not thread-safe. + * For this reason, it is essential to wrap accesses to this object in a synchronized block. + * Method write(OutgoingMessageEnvelope) allows multiple threads to encode records as that operation is stateless but + * restricts access to the shared objects through the synchronized block. + * Concurrent access to shared objects is controlled through a common lock and synchronized block and hence ensures + * thread safety. + * @param ome - OutgoingMessageEnvelope that contains the IndexedRecord (GenericRecord or SpecificRecord) or an encoded record as byte[] + * @throws IOException when + * - OutgoingMessageEnvelope's message is not an IndexedRecord or + * - underlying dataFileWriter.append fails + * @throws IllegalStateException when the first OutgoingMessageEnvelope's message is not a record. + */ + @Override + public void write(OutgoingMessageEnvelope ome) throws IOException { + Optional<IndexedRecord> optionalIndexedRecord; + byte[] encodedRecord; + if (ome.getMessage() instanceof IndexedRecord) { + optionalIndexedRecord = Optional.of((IndexedRecord) ome.getMessage()); + encodedRecord = encodeRecord((IndexedRecord) ome.getMessage()); + } else if (ome.getMessage() instanceof byte[]) { + optionalIndexedRecord = Optional.empty(); + encodedRecord = (byte[]) ome.getMessage(); + } else { + throw new IllegalArgumentException("AzureBlobAvroWriter only supports IndexedRecord and byte[]."); + } + + synchronized (currentDataFileWriterLock) { + // if currentBlobWriterComponents is null, then it is the first blob of this AzureBlobAvroWriter object + if (currentBlobWriterComponents == null || willCurrentBlobExceedSize(encodedRecord) || willCurrentBlobExceedRecordLimit()) { + startNextBlob(optionalIndexedRecord); + } + currentBlobWriterComponents.dataFileWriter.appendEncoded(ByteBuffer.wrap(encodedRecord)); + recordsInCurrentBlob++; + } + } + /** + * This method flushes all records written in dataFileWriter to the underlying AzureBlobOutputStream. + * dataFileWriter.flush then explicitly invokes flush of the AzureBlobOutputStream. + * This in turn async uploads content of the output stream as a block and reinits the output stream. + * AzureBlobOutputStream.flush is not ensured if dataFileWriter.flush fails. + * In such a scenario, the current block is not uploaded and blocks uploaded so far are lost. + * {@inheritDoc} + * @throws IOException if underlying dataFileWriter.flush fails + */ + @Override + public void flush() throws IOException { + synchronized (currentDataFileWriterLock) { + currentBlobWriterComponents.dataFileWriter.flush(); + } + } + + /** + * This method closes all DataFileWriters and output streams associated with all the blobs created. + * flush should be explicitly called before close. + * {@inheritDoc} + * @throws IllegalStateException when closing a closed writer + * @throws SamzaException if underlying DataFileWriter.close fails + */ + @Override + public void close() { + synchronized (currentDataFileWriterLock) { + if (isClosed) { + throw new IllegalStateException("Attempting to close an already closed AzureBlobAvroWriter"); + } + allBlobWriterComponents.forEach(blobWriterComponents -> { + try { + closeDataFileWriter(blobWriterComponents.dataFileWriter, blobWriterComponents.azureBlobOutputStream, + blobWriterComponents.blockBlobAsyncClient); + } catch (IOException e) { + throw new SamzaException(e); + } + }); + isClosed = true; + } + } + + @VisibleForTesting + AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, AzureBlobWriterMetrics metrics, + Executor blobThreadPool, int maxBlockFlushThresholdSize, int flushTimeoutMs, String blobURLPrefix, + DataFileWriter<IndexedRecord> dataFileWriter, + AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient, + long maxBlobSize, long maxRecordsPerBlob, Compression compression) { + if (dataFileWriter == null || azureBlobOutputStream == null || blockBlobAsyncClient == null) { + this.currentBlobWriterComponents = null; + } else { + this.currentBlobWriterComponents = + new BlobWriterComponents(dataFileWriter, azureBlobOutputStream, blockBlobAsyncClient); + } + this.allBlobWriterComponents.add(this.currentBlobWriterComponents); + this.blobThreadPool = blobThreadPool; + this.blobURLPrefix = blobURLPrefix; + this.metrics = metrics; + this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize; + this.flushTimeoutMs = flushTimeoutMs; + this.compression = compression; + this.containerAsyncClient = containerAsyncClient; + this.useRandomStringInBlobName = false; + this.maxBlobSize = maxBlobSize; + this.maxRecordsPerBlob = maxRecordsPerBlob; + } + + @VisibleForTesting + byte[] encodeRecord(IndexedRecord record) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Schema schema = record.getSchema(); + try { + EncoderFactory encoderfactory = new EncoderFactory(); + BinaryEncoder encoder = encoderfactory.binaryEncoder(out, null); + DatumWriter<IndexedRecord> writer; + if (record instanceof SpecificRecord) { + writer = new SpecificDatumWriter<>(schema); + } else { + writer = new GenericDatumWriter<>(schema); + } + writer.write(record, encoder); + encoder.flush(); //encoder may buffer + } catch (Exception e) { + throw new SamzaException("Unable to serialize Avro record using schema within the record: " + schema.toString(), e); + } + return out.toByteArray(); + } + + private static SimpleDateFormat buildUTCFormatter() { + SimpleDateFormat formatter = new SimpleDateFormat(PUBLISHED_FILE_NAME_DATE_FORMAT); + formatter.setTimeZone(TimeZone.getTimeZone("UTC")); + return formatter; + } + + private void closeDataFileWriter(DataFileWriter dataFileWriter, AzureBlobOutputStream azureBlobOutputStream, + BlockBlobAsyncClient blockBlobAsyncClient) throws IOException { + try { + LOG.info("Closing the blob: {}", blockBlobAsyncClient.getBlobUrl().toString()); + // dataFileWriter.close calls close of the azureBlobOutputStream associated with it. + dataFileWriter.close(); + } catch (Exception e) { + // ensure that close is called even if dataFileWriter.close fails. + // This is to avoid loss of all the blocks uploaded for the blob + // as commitBlockList happens in close of azureBlobOutputStream. + azureBlobOutputStream.close(); + throw e; + } + } + + private void startNextBlob(Optional<IndexedRecord> optionalIndexedRecord) throws IOException { + if (currentBlobWriterComponents != null) { + LOG.info("Starting new blob as current blob size is " + + currentBlobWriterComponents.azureBlobOutputStream.getSize() + + " and max blob size is " + maxBlobSize + + " or number of records is " + recordsInCurrentBlob + + " and max records in blob is " + maxRecordsPerBlob); + currentBlobWriterComponents.dataFileWriter.flush(); + currentBlobWriterComponents.azureBlobOutputStream.releaseBuffer(); + recordsInCurrentBlob = 0; + } + // datumWriter is null when AzureBlobAvroWriter is created but has not yet received a message. + // optionalIndexedRecord is the first message in this case. + if (datumWriter == null) { + if (optionalIndexedRecord.isPresent()) { + IndexedRecord record = optionalIndexedRecord.get(); + schema = record.getSchema(); + if (record instanceof SpecificRecord) { + datumWriter = new SpecificDatumWriter<>(schema); + } else { + datumWriter = new GenericDatumWriter<>(schema); + } + } else { + throw new IllegalStateException("Writing without schema setup."); + } + } + String blobURL; + if (useRandomStringInBlobName) { + blobURL = String.format(BLOB_NAME_RANDOM_STRING_AVRO, blobURLPrefix, + UTC_FORMATTER.format(System.currentTimeMillis()), blobNumber, UUID.randomUUID().toString().substring(0, 8), + compression.getFileExtension()); + } else { + blobURL = String.format(BLOB_NAME_AVRO, blobURLPrefix, + UTC_FORMATTER.format(System.currentTimeMillis()), blobNumber, compression.getFileExtension()); + } + LOG.info("Creating new blob: {}", blobURL); + BlockBlobAsyncClient blockBlobAsyncClient = containerAsyncClient.getBlobAsyncClient(blobURL).getBlockBlobAsyncClient(); + blobNumber++; Review comment: Could you please describe what the `blobNumber` is used for? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
