[
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546074#comment-16546074
]
ASF GitHub Bot commented on NIFI-5406:
--------------------------------------
Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2876#discussion_r202902757
--- Diff:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker<T extends ListableEntity> {
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+ private volatile Map<String, ListedEntity> alreadyListedEntities;
+
+ private static final String NOTE = "Used by 'Tracking Entities'
strategy.";
+ public static final PropertyDescriptor TRACKING_STATE_CACHE = new
PropertyDescriptor.Builder()
+ .name("et-state-cache")
+ .displayName("Entity Tracking State Cache")
+ .description(format("Listed entities are stored in the
specified cache storage" +
+ " so that this processor can resume listing across
NiFi restart or in case of primary node change." +
+ " 'Tracking Entities' strategy require tracking
information of all listed entities within the last 'Tracking Time Window'." +
+ " To support large number of entities, the strategy
uses DistributedMapCache instead of managed state." +
+ " Cache key format is
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+ " If it tracks per node listed entities, then the
optional '::{nodeId}' part is added to manage state separately." +
+ " E.g. cluster wide cache key =
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+ " per node cache key =
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+ " The stored cache content is Gzipped JSON string." +
+ " The cache key will be deleted when target listing
configuration is changed." +
+ " %s", NOTE))
+ .identifiesControllerService(DistributedMapCacheClient.class)
+ .build();
+
+ public static final PropertyDescriptor TRACKING_TIME_WINDOW = new
PropertyDescriptor.Builder()
+ .name("et-time-window")
+ .displayName("Entity Tracking Time Window")
+ .description(format("Specify how long this processor should
track already-listed entities." +
+ " 'Tracking Entities' strategy can pick any entity
whose timestamp is inside the specified time window." +
+ " For example, if set to '30 minutes', any entity
having timestamp in recent 30 minutes will be the listing target when this
processor runs." +
+ " A listed entity is considered 'new/updated' and a
FlowFile is emitted if one of following condition meets:" +
+ " 1. does not exist in the already-listed entities," +
+ " 2. has newer timestamp than the cached entity," +
+ " 3. has different size than the cached entity." +
+ " If a cached entity's timestamp becomes older than
specified time window, that entity will be removed from the cached
already-listed entities." +
+ " %s", NOTE))
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("3 hours")
+ .build();
+
+ private static final AllowableValue INITIAL_LISTING_TARGET_ALL = new
AllowableValue("all", "All Available",
+ "Regardless of entities timestamp, all existing entities will
be listed at the initial listing activity.");
+ private static final AllowableValue INITIAL_LISTING_TARGET_WINDOW =
new AllowableValue("window", "Tracking Time Window",
+ "Ignore entities having timestamp older than the specified
'Tracking Time Window' at the initial listing activity.");
+
+ public static final PropertyDescriptor INITIAL_LISTING_TARGET = new
PropertyDescriptor.Builder()
+ .name("et-initial-listing-target")
+ .displayName("Entity Tracking Initial Listing Target")
+ .description(format("Specify how initial listing should be
handled." +
+ " %s", NOTE))
+ .allowableValues(INITIAL_LISTING_TARGET_WINDOW,
INITIAL_LISTING_TARGET_ALL)
+ .defaultValue(INITIAL_LISTING_TARGET_WINDOW.getValue())
+ .build();
+
+ public static final PropertyDescriptor NODE_IDENTIFIER = new
PropertyDescriptor.Builder()
+ .name("et-node-identifier")
+ .displayName("Entity Tracking Node Identifier")
+ .description(format("The configured value will be appended to
the cache key" +
+ " so that listing state can be tracked per NiFi node
rather than cluster wide" +
+ " when tracking state is scoped to LOCAL. %s", NOTE))
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("${hostname()}")
+ .build();
+
+ private static Supplier<Long> CURRENT_TIMESTAMP =
System::currentTimeMillis;
+
+ private Serializer<String> stringSerializer = (v, o) ->
o.write(v.getBytes(StandardCharsets.UTF_8));
+ private Serializer<Map<String, ListedEntity>> listedEntitiesSerializer
+ = (v, o) -> objectMapper.writeValue(new GZIPOutputStream(o),
v);
--- End diff --
Thanks for catching this. I switched to use NiFi's GZIPOutputStream. As for
closing stream, I don't think we can close the output stream passed to the
serializer, because the stream may be used by the caller to write additional
bytes. None of current implementation does that, but we can't be certain about
if it's fine to close the stream here in this context.
GZIPOutputStream has `finish` method to finish writing Gzip data without
closing the underlying stream. So I added calling finish from serializer.
> Add new listing strategy by tracking listed entities to ListXXXX processors
> ---------------------------------------------------------------------------
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Koji Kawamura
> Assignee: Koji Kawamura
> Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation
> relies on file last modified timestamp to pick new or updated files. This
> approach is efficient and lightweight in terms of state management, because
> it only tracks latest modified timestamp and last executed timestamp.
> However, timestamps do not work as expected in some file systems, causing
> List processors missing files periodically. See NIFI-3332 comments for
> details.
> In order to pick every entity that has not seen before or has been updated
> since it had seen last time, we need another set of processors using
> different approach, that is by tracking listed entities:
> * Add new abstract processor AbstractWatchEntries similar to
> AbstractListProcessor but uses different approach
> * Target entities have: name (path), size and last-modified-timestamp
> * Implementation Processors have following properties:
> ** 'Watch Time Window' to limit the maximum time period to hold the already
> listed entries. E.g. if set as '30min', the processor keeps entities listed
> in the last 30 mins.
> ** 'Minimum File Age' to defer listing entities potentially being written
> * Any entity added but not listed ever having last-modified-timestamp older
> than configured 'Watch Time Window' will not be listed. If user needs to pick
> these items, they have to make 'Watch Time Window' longer. It also increases
> the size of data the processor has to persist in the K/V store. Efficiency vs
> reliability trade-off.
> * The already-listed entities are persisted into one of supported K/V store
> through DistributedMapCacheClient service. User can chose what KVS to use
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with
> persistence file).
> * The reason to use KVS instead of ManagedState is, to avoid hammering
> Zookeeper too much with frequently updating Zk node with large amount of
> data. The number of already-listed entries can be huge depending on
> use-cases. Also, we can compress entities with DistributedMapCacheClient as
> it supports putting byte array, while ManagedState only supports Map<String,
> String>.
> * On each onTrigger:
> ** Processor performs listing. Listed entries meeting any of the following
> condition will be written to the 'success' output FlowFile:
> *** Not exists in the already-listed entities
> *** Having newer last-modified-timestamp
> *** Having different size
> ** Already listed entries those are old enough compared to 'Watch Time
> Window' are discarded from the already-listed entries.
> * Initial supporting target is Local file system, FTP and SFTP
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)