exceptionfactory commented on a change in pull request #5254:
URL: https://github.com/apache/nifi/pull/5254#discussion_r681024731



##########
File path: 
nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
##########
@@ -32,6 +33,11 @@
 
     File getKrb5File();
 
+    /**
+     * @return the directory to use for storing FlowFile Content, or an empty 
optional if content is to be stored in memory
+     */
+    Optional<File> getContentRepositoryStoragePath();

Review comment:
       Is there a reason for naming this method differently than the property 
name `content.repository.directory`?

##########
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
##########
@@ -174,10 +174,17 @@ public void start(final Map<String, String> properties) {
             partitionMap = clusterStatePartitionMap;
         }
 
-        for (final FlowFile flowFile : outputFlowFiles) {
-            final byte[] contents = triggerResult.readContent(flowFile);
-            final SourceRecord sourceRecord = createSourceRecord(flowFile, 
contents, componentState, partitionMap);
-            sourceRecords.add(sourceRecord);
+        try {
+            for (final FlowFile flowFile : outputFlowFiles) {
+                final byte[] contents = triggerResult.readContent(flowFile);
+                final SourceRecord sourceRecord = createSourceRecord(flowFile, 
contents, componentState, partitionMap);
+                sourceRecords.add(sourceRecord);
+            }
+        } catch (final Exception e) {
+            logger.error("Failed to obtain contents of Output FlowFiles in 
order to form Kafka Record", e);
+            triggerResult.abort(e);
+            failureYieldExpiration = System.currentTimeMillis() + 1000L; // 
delay next execution for 1 second to avoid constnatly failing and utilization 
huge amounts of resources

Review comment:
       One additional minor adjustment, the class uses the `1000L` delay in 
multiple places, so it might be helpful to promote it to a static final 
variable for naming clarity and reusability.

##########
File path: 
nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java
##########
@@ -144,14 +149,23 @@ public boolean isCanceled() {
             }
 
             @Override
-            public byte[] readContent(final FlowFile flowFile) {
+            public byte[] readContent(final FlowFile flowFile) throws 
IOException {
                 if (!(flowFile instanceof FlowFileRecord)) {
                     throw new IllegalArgumentException("FlowFile was not 
created by this flow");
                 }
 
                 final FlowFileRecord flowFileRecord = (FlowFileRecord) 
flowFile;
                 final ContentClaim contentClaim = 
flowFileRecord.getContentClaim();
-                final byte[] contentClaimContents = 
contentRepository.getBytes(contentClaim);
+
+                if (contentClaim.getLength() > Integer.MAX_VALUE) {
+                    throw new IOException("Cannot return contents of " + 
flowFile + " as a byte array because the contents are too large: " + 
contentClaim.getLength() + " bytes");

Review comment:
       What do you think about adjusting the message to indicate the reason 
more specifically?
   ```suggestion
                       throw new IOException("flowFile + " Content Length [" + 
contentClaim.getLength() + " bytes]" exceeds maximum byte array supported 
length [" + Integer.MAX_VALUE + " bytes]");
   ```

##########
File path: 
nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemRepository.java
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.repository;
+
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class StatelessFileSystemRepository implements ContentRepository {
+    private static final Logger logger = 
LoggerFactory.getLogger(StatelessFileSystemRepository.class);
+
+    private static final String CONTAINER = "stateless";
+    private static final String SECTION = "stateless";
+
+    private final File directory;
+    private final ConcurrentMap<ResourceClaim, 
SynchronizedByteCountingOutputStream> writableStreamMap = new 
ConcurrentHashMap<>();
+    private final AtomicLong resourceClaimIndex = new AtomicLong(0L);
+    private final BlockingQueue<ResourceClaim> writableClaimQueue = new 
LinkedBlockingQueue<>();
+    private ResourceClaimManager resourceClaimManager;
+
+    public StatelessFileSystemRepository(final File directory) {
+        this.directory = directory;
+    }
+
+    @Override
+    public void initialize(final ResourceClaimManager claimManager) throws 
IOException {
+        this.resourceClaimManager = claimManager;
+        if (!directory.exists() && !directory.mkdirs()) {
+            throw new IOException("Cannot initialize Content Repository 
because " + directory.getAbsolutePath() + " does not exist and cannot be 
created");
+        }
+
+        // Check if there are any existing files and if so, purges them.
+        final File[] existingFiles = directory.listFiles(file -> 
file.getName().matches("\\d+\\.nifi\\.bin"));

Review comment:
       The reason for the particular regular expression pattern is not 
immediately apparent, so it might be helpful to use a named static variable.

##########
File path: 
nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/TriggerResult.java
##########
@@ -57,11 +58,18 @@
      * Provides the contents of a FlowFile that was obtained by calling {@link 
#getOutputFlowFiles()}.
      * @param flowFile the FlowFile whose contents are to be read
      * @return the contents of the FlowFile
+     * @throws IOException if unable to read the contents of the FlowFile
      */
-    byte[] readContent(FlowFile flowFile);
+    byte[] readContent(FlowFile flowFile) throws IOException;
 
     /**
      * Acknowledges the output of the dataflow and allows the session to be 
successfully committed.
      */
     void acknowledge();
+
+    /**
+     * Aborts the dataflow
+     * @param cause the cause for aborting the dataflow

Review comment:
       Is it acceptable for the cause to be `null`? It might be worth stating 
one way or the other if there is a specific expectation.

##########
File path: 
nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/TriggerResult.java
##########
@@ -57,11 +58,18 @@
      * Provides the contents of a FlowFile that was obtained by calling {@link 
#getOutputFlowFiles()}.
      * @param flowFile the FlowFile whose contents are to be read
      * @return the contents of the FlowFile
+     * @throws IOException if unable to read the contents of the FlowFile
      */
-    byte[] readContent(FlowFile flowFile);
+    byte[] readContent(FlowFile flowFile) throws IOException;

Review comment:
       If throwing an `IOException` can only occur in the file system 
implementation, would it make sense to keep the existing method signature and 
change the implementing classes to wrap `IOException` in 
`UncheckedIOException`? It depends on how likely it is for an IOException to 
occur, and whether the caller should be expected to do anything useful with the 
IOException. Just something to consider, this may be best as implemented.

##########
File path: 
nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/CloneFlowFileIT.java
##########
@@ -77,15 +77,26 @@ public void testClone() throws IOException, 
StatelessConfigurationException, Int
         assertEquals("123", second.getAttribute("abc"));
 
         final long countNormal = flowFiles.stream()
-            .filter(flowFile -> new String(result.readContent(flowFile), 
StandardCharsets.UTF_8).equals("Hello"))
+            .filter(flowFile -> readContentAsString(result, 
flowFile).equals("Hello"))
             .count();
 
         final long countReversed = flowFiles.stream()
-            .filter(flowFile -> new String(result.readContent(flowFile), 
StandardCharsets.UTF_8).equals("olleH"))
+            .filter(flowFile -> readContentAsString(result, 
flowFile).equals("olleH"))
             .count();
 
         assertEquals(1L, countNormal);
         assertEquals(1L, countReversed);
+
+        result.acknowledge();
     }
 
+    private String readContentAsString(final TriggerResult result, final 
FlowFile flowFile) {
+        try {
+            return new String(result.readContent(flowFile), 
StandardCharsets.UTF_8);
+        } catch (final IOException e) {
+            e.printStackTrace();
+            Assert.fail("Could not read content");
+            return null;

Review comment:
       This could be replaced with throwing an UncheckedIOException(), which 
would also cause the test method to fail.
   ```suggestion
               throw new UncheckedIOException("Could not read content", e);
   ```

##########
File path: 
nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemRepository.java
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.repository;
+
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class StatelessFileSystemRepository implements ContentRepository {
+    private static final Logger logger = 
LoggerFactory.getLogger(StatelessFileSystemRepository.class);
+
+    private static final String CONTAINER = "stateless";
+    private static final String SECTION = "stateless";
+
+    private final File directory;
+    private final ConcurrentMap<ResourceClaim, 
SynchronizedByteCountingOutputStream> writableStreamMap = new 
ConcurrentHashMap<>();
+    private final AtomicLong resourceClaimIndex = new AtomicLong(0L);
+    private final BlockingQueue<ResourceClaim> writableClaimQueue = new 
LinkedBlockingQueue<>();
+    private ResourceClaimManager resourceClaimManager;
+
+    public StatelessFileSystemRepository(final File directory) {
+        this.directory = directory;
+    }
+
+    @Override
+    public void initialize(final ResourceClaimManager claimManager) throws 
IOException {
+        this.resourceClaimManager = claimManager;
+        if (!directory.exists() && !directory.mkdirs()) {
+            throw new IOException("Cannot initialize Content Repository 
because " + directory.getAbsolutePath() + " does not exist and cannot be 
created");
+        }
+
+        // Check if there are any existing files and if so, purges them.
+        final File[] existingFiles = directory.listFiles(file -> 
file.getName().matches("\\d+\\.nifi\\.bin"));
+        if (existingFiles == null) {
+            throw new IOException("Cannot initialize Content Repository 
because failed to list contents of directory " + directory.getAbsolutePath());
+        }
+
+        for (final File existingFile : existingFiles) {
+            logger.info("Found existing file from previous run {}. Removing 
file.", existingFile.getName());
+            final boolean deleted = existingFile.delete();
+
+            if (!deleted) {
+                logger.warn("Failed to remove existing file from previous run 
{}", existingFile);
+            }
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        purge();
+    }
+
+    @Override
+    public Set<String> getContainerNames() {
+        return Collections.singleton(CONTAINER);
+    }
+
+    @Override
+    public long getContainerCapacity(final String containerName) {
+        return 0;
+    }
+
+    @Override
+    public long getContainerUsableSpace(final String containerName) {
+        return 0;
+    }
+
+    @Override
+    public String getContainerFileStoreName(final String containerName) {
+        return "container";
+    }
+
+    @Override
+    public ContentClaim create(final boolean lossTolerant) throws IOException {
+        ResourceClaim resourceClaim = writableClaimQueue.poll();
+        long offset;
+
+        if (resourceClaim == null) {
+            resourceClaim = new StandardResourceClaim(resourceClaimManager, 
CONTAINER, SECTION, String.valueOf(resourceClaimIndex.getAndIncrement()), 
false);
+            offset = 0L;
+
+            final File resourceClaimFile = getFile(resourceClaim);
+            final OutputStream fos = new FileOutputStream(resourceClaimFile);
+            final SynchronizedByteCountingOutputStream contentOutputStream = 
new SynchronizedByteCountingOutputStream(fos);
+            writableStreamMap.put(resourceClaim, contentOutputStream);
+        } else {
+            final SynchronizedByteCountingOutputStream contentOutputStream = 
writableStreamMap.get(resourceClaim);
+            offset = contentOutputStream.getBytesWritten();
+        }
+
+        final ContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, offset);
+        
resourceClaimManager.incrementClaimantCount(contentClaim.getResourceClaim());
+        return contentClaim;
+    }
+
+    @Override
+    public int incrementClaimaintCount(final ContentClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+
+        return 
resourceClaimManager.incrementClaimantCount(claim.getResourceClaim());
+    }
+
+    @Override
+    public int getClaimantCount(final ContentClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+
+        return resourceClaimManager.getClaimantCount(claim.getResourceClaim());
+    }
+
+    @Override
+    public int decrementClaimantCount(final ContentClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+
+        return 
resourceClaimManager.decrementClaimantCount(claim.getResourceClaim());
+    }
+
+    @Override
+    public boolean remove(final ContentClaim claim) {
+        return true;
+    }
+
+    @Override
+    public ContentClaim clone(final ContentClaim original, final boolean 
lossTolerant) throws IOException {
+        final ContentClaim clone = create(lossTolerant);
+        try (final InputStream in = read(original);
+             final OutputStream out = write(clone)) {
+            StreamUtils.copy(in, out);
+        }
+
+        return clone;
+    }
+
+    @Override
+    public long merge(final Collection<ContentClaim> claims, final 
ContentClaim destination, final byte[] header, final byte[] footer, final 
byte[] demarcator) {
+        throw new UnsupportedOperationException("This never gets used");
+    }
+
+    @Override
+    public long importFrom(final Path content, final ContentClaim claim) 
throws IOException {
+        try (final InputStream in = Files.newInputStream(content, 
StandardOpenOption.READ)) {
+            return importFrom(in, claim);
+        }
+    }
+
+    @Override
+    public long importFrom(final InputStream content, final ContentClaim 
claim) throws IOException {
+        try (final OutputStream out = write(claim)) {
+            return StreamUtils.copy(content, out);
+        }
+    }
+
+    @Override
+    public long exportTo(final ContentClaim claim, final Path destination, 
final boolean append) throws IOException {
+        final OpenOption[] openOptions = append ? new StandardOpenOption[] 
{StandardOpenOption.CREATE, StandardOpenOption.APPEND} :
+            new StandardOpenOption[] {StandardOpenOption.CREATE};
+
+        try (final OutputStream out = Files.newOutputStream(destination, 
openOptions)) {
+            return exportTo(claim, out);
+        }
+    }
+
+    @Override
+    public long exportTo(final ContentClaim claim, final Path destination, 
final boolean append, final long offset, final long length) throws IOException {
+        final OpenOption[] openOptions = append ? new StandardOpenOption[] 
{StandardOpenOption.CREATE, StandardOpenOption.APPEND} :
+            new StandardOpenOption[] {StandardOpenOption.CREATE};
+
+        try (final OutputStream out = Files.newOutputStream(destination, 
openOptions)) {
+            return exportTo(claim, out, offset, length);
+        }
+    }
+
+    @Override
+    public long exportTo(final ContentClaim claim, final OutputStream 
destination) throws IOException {
+        try (final InputStream in = read(claim)) {
+            return StreamUtils.copy(in, destination);
+        }
+    }
+
+    @Override
+    public long exportTo(final ContentClaim claim, final OutputStream 
destination, final long offset, final long length) throws IOException {
+        try (final InputStream in = read(claim)) {
+            StreamUtils.skip(in, offset);
+            StreamUtils.copy(in, destination, length);
+        }
+
+        return length;
+    }
+
+    @Override
+    public long size(final ContentClaim claim) {
+        return claim.getLength();
+    }
+
+    @Override
+    public InputStream read(final ContentClaim claim) throws IOException {
+        if (claim == null) {
+            return new ByteArrayInputStream(new byte[0]);
+        }
+
+        final InputStream resourceClaimIn = read(claim.getResourceClaim());
+        StreamUtils.skip(resourceClaimIn, claim.getOffset());
+
+        final InputStream limitedIn = new LimitedInputStream(resourceClaimIn, 
claim.getLength());
+        return limitedIn;
+    }
+
+    @Override
+    public InputStream read(final ResourceClaim claim) throws IOException {
+        validateResourceClaim(claim);
+        final File file = getFile(claim);
+        return new FileInputStream(file);
+    }
+
+    private File getFile(final ResourceClaim claim) {
+        return new File(directory, claim.getId() + ".nifi.bin");

Review comment:
       For clarity, what do you think about declaring a static file extension 
variable, then using `String.endsWith()` when searching for existing files in 
`initialize()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to