This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new eb8d4ee06f NIFI-13930 PutAzureDataLakeStorage sets close flag on file
write so that Azure can emit FlushWithClose event
eb8d4ee06f is described below
commit eb8d4ee06f92f5ac9671040277177204e4f72be0
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Sat Oct 26 21:51:37 2024 +0200
NIFI-13930 PutAzureDataLakeStorage sets close flag on file write so that
Azure can emit FlushWithClose event
Signed-off-by: Pierre Villard <[email protected]>
This closes #9451.
---
.../azure/storage/PutAzureDataLakeStorage.java | 28 ++++++++++------------
1 file changed, 13 insertions(+), 15 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 5552fa3af1..8475e4bcec 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -16,12 +16,14 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.azure.core.util.Context;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
+import com.azure.storage.file.datalake.options.DataLakeFileFlushOptions;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -225,21 +227,18 @@ public class PutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProcess
private void uploadFile(final ProcessSession session, final FlowFile
flowFile, final Optional<FileResource> fileResourceFound,
final long transferSize, final DataLakeFileClient
fileClient) throws Exception {
- if (transferSize > 0) {
- try (final InputStream inputStream = new BufferedInputStream(
- fileResourceFound.map(FileResource::getInputStream)
- .orElseGet(() -> session.read(flowFile)))
- ) {
- uploadContent(fileClient, inputStream, transferSize);
- } catch (final Exception e) {
- removeFile(fileClient);
- throw e;
- }
+ try (final InputStream inputStream = new BufferedInputStream(
+ fileResourceFound.map(FileResource::getInputStream)
+ .orElseGet(() -> session.read(flowFile)))
+ ) {
+ uploadContent(fileClient, inputStream, transferSize);
+ } catch (final Exception e) {
+ removeFile(fileClient);
+ throw e;
}
}
- //Visible for testing
- static void uploadContent(final DataLakeFileClient fileClient, final
InputStream in, final long length) throws IOException {
+ private static void uploadContent(final DataLakeFileClient fileClient,
final InputStream in, final long length) throws IOException {
long chunkStart = 0;
long chunkSize;
@@ -258,8 +257,7 @@ public class PutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProcess
chunkStart += chunkSize;
}
- // use overwrite mode due to
https://github.com/Azure/azure-sdk-for-java/issues/31248
- fileClient.flush(length, true);
+ fileClient.flushWithResponse(length, new
DataLakeFileFlushOptions().setClose(true), null, Context.NONE);
}
/**
@@ -272,7 +270,7 @@ public class PutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProcess
* @return the file client of the uploaded file or {@code null} if the
file already exists and conflict resolution strategy is 'ignore'
* @throws ProcessException if the file already exists and the conflict
resolution strategy is 'fail'; also in case of other errors
*/
- DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient,
final String fileName, final String conflictResolution) {
+ private DataLakeFileClient createFile(DataLakeDirectoryClient
directoryClient, final String fileName, final String conflictResolution) {
final String destinationPath =
createPath(directoryClient.getDirectoryPath(), fileName);
try {