Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2876#discussion_r202902757
  
    --- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
    @@ -0,0 +1,322 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processor.util.list;
    +
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +import java.util.zip.GZIPInputStream;
    +import java.util.zip.GZIPOutputStream;
    +
    +import static java.lang.String.format;
    +import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
    +
    +public class ListedEntityTracker<T extends ListableEntity> {
    +
    +    private ObjectMapper objectMapper = new ObjectMapper();
    +    private volatile Map<String, ListedEntity> alreadyListedEntities;
    +
    +    private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
    +    public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
    +            .name("et-state-cache")
    +            .displayName("Entity Tracking State Cache")
    +            .description(format("Listed entities are stored in the 
specified cache storage" +
    +                    " so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
    +                    " 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
    +                    " To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
    +                    " Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
    +                    " If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
    +                    " E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
    +                    " per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
    +                    " The stored cache content is Gzipped JSON string." +
    +                    " The cache key will be deleted when target listing 
configuration is changed." +
    +                    " %s", NOTE))
    +            .identifiesControllerService(DistributedMapCacheClient.class)
    +            .build();
    +
    +    public static final PropertyDescriptor TRACKING_TIME_WINDOW = new 
PropertyDescriptor.Builder()
    +            .name("et-time-window")
    +            .displayName("Entity Tracking Time Window")
    +            .description(format("Specify how long this processor should 
track already-listed entities." +
    +                    " 'Tracking Entities' strategy can pick any entity 
whose timestamp is inside the specified time window." +
    +                    " For example, if set to '30 minutes', any entity 
having timestamp in recent 30 minutes will be the listing target when this 
processor runs." +
    +                    " A listed entity is considered 'new/updated' and a 
FlowFile is emitted if one of following condition meets:" +
    +                    " 1. does not exist in the already-listed entities," +
    +                    " 2. has newer timestamp than the cached entity," +
    +                    " 3. has different size than the cached entity." +
    +                    " If a cached entity's timestamp becomes older than 
specified time window, that entity will be removed from the cached 
already-listed entities." +
    +                    " %s", NOTE))
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .defaultValue("3 hours")
    +            .build();
    +
    +    private static final AllowableValue INITIAL_LISTING_TARGET_ALL = new 
AllowableValue("all", "All Available",
    +            "Regardless of entities timestamp, all existing entities will 
be listed at the initial listing activity.");
    +    private static final AllowableValue INITIAL_LISTING_TARGET_WINDOW = 
new AllowableValue("window", "Tracking Time Window",
    +            "Ignore entities having timestamp older than the specified 
'Tracking Time Window' at the initial listing activity.");
    +
    +    public static final PropertyDescriptor INITIAL_LISTING_TARGET = new 
PropertyDescriptor.Builder()
    +            .name("et-initial-listing-target")
    +            .displayName("Entity Tracking Initial Listing Target")
    +            .description(format("Specify how initial listing should be 
handled." +
    +                    " %s", NOTE))
    +            .allowableValues(INITIAL_LISTING_TARGET_WINDOW, 
INITIAL_LISTING_TARGET_ALL)
    +            .defaultValue(INITIAL_LISTING_TARGET_WINDOW.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor NODE_IDENTIFIER = new 
PropertyDescriptor.Builder()
    +            .name("et-node-identifier")
    +            .displayName("Entity Tracking Node Identifier")
    +            .description(format("The configured value will be appended to 
the cache key" +
    +                    " so that listing state can be tracked per NiFi node 
rather than cluster wide" +
    +                    " when tracking state is scoped to LOCAL. %s", NOTE))
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .defaultValue("${hostname()}")
    +            .build();
    +
    +    private static Supplier<Long> CURRENT_TIMESTAMP = 
System::currentTimeMillis;
    +
    +    private Serializer<String> stringSerializer = (v, o) -> 
o.write(v.getBytes(StandardCharsets.UTF_8));
    +    private Serializer<Map<String, ListedEntity>> listedEntitiesSerializer
    +            = (v, o) -> objectMapper.writeValue(new GZIPOutputStream(o), 
v);
    --- End diff --
    
    Thanks for catching this. I switched to use NiFi's GZIPOutputStream. As for 
closing stream, I don't think we can close the output stream passed to the 
serializer, because the stream may be used by the caller to write additional 
bytes. None of current implementation does that, but we can't be certain about 
if it's fine to close the stream here in this context.
    
    GZIPOutputStream has `finish` method to finish writing Gzip data without 
closing the underlying stream. So I added calling finish from serializer.


---

Reply via email to