Repository: nifi
Updated Branches:
  refs/heads/master 11fd67cd1 -> 4d21f9b34


NIFI-5709, NIFI-5710: Addressed issue that causes NiFi to not be able to read 
provenance events when the a new FlowFile is created and then auto-terminated 
in the same session; minor bug fixes outlined in NIFI-5710

This closes #3083


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4d21f9b3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4d21f9b3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4d21f9b3

Branch: refs/heads/master
Commit: 4d21f9b34eb254014ba1e4a997417236c914fd08
Parents: 11fd67c
Author: Mark Payne <[email protected]>
Authored: Tue Oct 16 14:27:29 2018 -0400
Committer: Matt Gilman <[email protected]>
Committed: Tue Oct 16 15:25:29 2018 -0400

----------------------------------------------------------------------
 .../nifi/cluster/manager/StatusMerger.java      |  3 +-
 .../server/StandardLoadBalanceProtocol.java     |  5 +-
 .../repository/StandardRepositoryRecord.java    | 26 +++++++---
 .../TestStandardRepositoryRecord.java           | 54 ++++++++++++++++++++
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  6 +++
 5 files changed, 86 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4d21f9b3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
index b44f8d6..0043ca0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -61,6 +61,7 @@ import 
org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -731,7 +732,7 @@ public class StatusMerger {
             gcDiagnosticsDto.setMemoryManagerName(memoryManagerName);
 
             final List<GCDiagnosticsSnapshotDTO> gcDiagnosticsSnapshots = new 
ArrayList<>(snapshotMap.values());
-            Collections.sort(gcDiagnosticsSnapshots, (a, b) -> 
a.getTimestamp().compareTo(b.getTimestamp()));
+            
gcDiagnosticsSnapshots.sort(Comparator.comparing(GCDiagnosticsSnapshotDTO::getTimestamp).reversed());
 
             gcDiagnosticsDto.setSnapshots(gcDiagnosticsSnapshots);
             gcDiagnosticsDtos.add(gcDiagnosticsDto);

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d21f9b3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index d6beff3..0f032df 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -68,6 +68,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.zip.CRC32;
@@ -465,6 +466,7 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
         }
 
         final Map<String, String> attributes = readAttributes(metadataIn);
+        final String sourceSystemUuid = 
attributes.get(CoreAttributes.UUID.key());
 
         logger.debug("Received Attributes {} from Peer {}", attributes, 
peerDescription);
 
@@ -476,6 +478,7 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
             .id(flowFileRepository.getNextFlowFileSequence())
             .addAttributes(attributes)
+            .addAttribute(CoreAttributes.UUID.key(), 
UUID.randomUUID().toString())
             .contentClaim(contentClaimTriple.getContentClaim())
             .contentClaimOffset(contentClaimTriple.getClaimOffset())
             .size(contentClaimTriple.getContentLength())
@@ -484,7 +487,7 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
             .build();
 
         logger.debug("Received FlowFile {} with {} attributes and {} bytes of 
content", flowFileRecord, attributes.size(), 
contentClaimTriple.getContentLength());
-        return new 
RemoteFlowFileRecord(attributes.get(CoreAttributes.UUID.key()), flowFileRecord);
+        return new RemoteFlowFileRecord(sourceSystemUuid, flowFileRecord);
     }
 
     private Map<String, String> readAttributes(final DataInputStream in) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d21f9b3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
index c960902..4aeb473 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
@@ -48,7 +48,7 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
      */
     public StandardRepositoryRecord(final FlowFileQueue originalQueue) {
         this(originalQueue, null);
-        this.type = RepositoryRecordType.CREATE;
+        setType(RepositoryRecordType.CREATE);
     }
 
     /**
@@ -59,13 +59,13 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
      */
     public StandardRepositoryRecord(final FlowFileQueue originalQueue, final 
FlowFileRecord originalFlowFileRecord) {
         this(originalQueue, originalFlowFileRecord, null);
-        this.type = RepositoryRecordType.UPDATE;
+        setType(RepositoryRecordType.UPDATE);
     }
 
     public StandardRepositoryRecord(final FlowFileQueue originalQueue, final 
FlowFileRecord originalFlowFileRecord, final String swapLocation) {
         this.originalQueue = originalQueue;
         this.originalFlowFileRecord = originalFlowFileRecord;
-        this.type = RepositoryRecordType.SWAP_OUT;
+        setType(RepositoryRecordType.SWAP_OUT);
         this.swapLocation = swapLocation;
         this.originalAttributes = originalFlowFileRecord == null ? 
Collections.emptyMap() : originalFlowFileRecord.getAttributes();
     }
@@ -96,7 +96,7 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
     public void setSwapLocation(final String swapLocation) {
         this.swapLocation = swapLocation;
         if (type != RepositoryRecordType.SWAP_OUT) {
-            type = RepositoryRecordType.SWAP_IN; // we are swapping in a new 
record
+            setType(RepositoryRecordType.SWAP_IN); // we are swapping in a new 
record
         }
     }
 
@@ -159,7 +159,7 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
     }
 
     public void markForAbort() {
-        type = RepositoryRecordType.CONTENTMISSING;
+        setType(RepositoryRecordType.CONTENTMISSING);
     }
 
     @Override
@@ -168,7 +168,7 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
     }
 
     public void markForDelete() {
-        type = RepositoryRecordType.DELETE;
+        setType(RepositoryRecordType.DELETE);
     }
 
     public boolean isMarkedForDelete() {
@@ -222,6 +222,20 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
         return updatedAttributes == null ? Collections.emptyMap() : 
updatedAttributes;
     }
 
+    private void setType(final RepositoryRecordType newType) {
+        if (newType == this.type) {
+            return;
+        }
+
+        if (this.type == RepositoryRecordType.CREATE) {
+            // Because we don't copy updated attributes to 
`this.updatedAttributes` for CREATE records, we need to ensure
+            // that if a record is changed from CREATE to anything else that 
we do properly update the `this.updatedAttributes` field.
+            this.updatedAttributes = new 
HashMap<>(getCurrent().getAttributes());
+        }
+
+        this.type = newType;
+    }
+
     @Override
     public String toString() {
         return "StandardRepositoryRecord[UpdateType=" + getType() + ",Record=" 
+ getCurrent() + "]";

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d21f9b3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java
new file mode 100644
index 0000000..61e23fe
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/TestStandardRepositoryRecord.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestStandardRepositoryRecord {
+
+    @Test
+    public void testUpdatedAttributesMaintainedWhenFlowFileRemoved() {
+        final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
+
+        final Map<String, String> updatedAttributes = new HashMap<>();
+        updatedAttributes.put("abc", "xyz");
+        updatedAttributes.put("hello", "123");
+
+        final String uuid = UUID.randomUUID().toString();
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .addAttribute("uuid", uuid)
+            .addAttributes(updatedAttributes)
+            .build();
+
+        record.setWorking(flowFileRecord, updatedAttributes);
+
+        final Map<String, String> updatedWithId = new 
HashMap<>(updatedAttributes);
+        updatedWithId.put("uuid", uuid);
+
+        assertEquals(updatedWithId, record.getUpdatedAttributes());
+
+        record.markForDelete();
+
+        assertEquals(updatedWithId, record.getUpdatedAttributes());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d21f9b3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 9020ae0..1393608 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -3292,6 +3292,10 @@ public final class DtoFactory {
             }
 
             final String serviceId = entry.getValue();
+            if (serviceId == null) {
+                continue;
+            }
+
             final ControllerServiceNode serviceNode = 
serviceProvider.getControllerServiceNode(serviceId);
             if (serviceNode == null) {
                 continue;
@@ -3573,6 +3577,8 @@ public final class DtoFactory {
                 gcSnapshots.add(snapshotDto);
             }
 
+            
gcSnapshots.sort(Comparator.comparing(GCDiagnosticsSnapshotDTO::getTimestamp).reversed());
+
             final GarbageCollectionDiagnosticsDTO gcDto = new 
GarbageCollectionDiagnosticsDTO();
             gcDto.setMemoryManagerName(memoryManager);
             gcDto.setSnapshots(gcSnapshots);

Reply via email to