This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/analytics-framework by this
push:
new e5738fd NIFI-6510 Extract out predictions into their own DTO
e5738fd is described below
commit e5738fd557bcd43b9c3fc3b555dd43bfb66c387e
Author: Andrew I. Christianson <[email protected]>
AuthorDate: Thu Sep 5 10:07:11 2019 -0400
NIFI-6510 Extract out predictions into their own DTO
---
.../nifi/controller/status/ConnectionStatus.java | 108 +++----------------
.../analytics/ConnectionStatusPredictions.java | 119 +++++++++++++++++++++
.../ConnectionStatusPredictionsSnapshotDTO.java | 112 +++++++++++++++++++
.../dto/status/ConnectionStatusSnapshotDTO.java | 97 +++--------------
.../apache/nifi/cluster/manager/StatusMerger.java | 70 ++++++------
.../apache/nifi/reporting/StandardEventAccess.java | 22 ++--
.../org/apache/nifi/web/api/dto/DtoFactory.java | 51 ++++-----
7 files changed, 326 insertions(+), 253 deletions(-)
diff --git
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
index 5c6189a..fa0e6c3 100644
---
a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
+++
b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.status;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
import org.apache.nifi.processor.DataUnit;
/**
@@ -30,7 +31,7 @@ public class ConnectionStatus implements Cloneable {
private String destinationId;
private String destinationName;
private String backPressureDataSizeThreshold;
- private Boolean predictionsAvailable;
+ private ConnectionStatusPredictions predictions;
private long backPressureBytesThreshold;
private long backPressureObjectThreshold;
private int inputCount;
@@ -41,13 +42,6 @@ public class ConnectionStatus implements Cloneable {
private long outputBytes;
private int maxQueuedCount;
private long maxQueuedBytes;
- private long predictionIntervalMillis;
- private int nextPredictedQueuedCount;
- private long nextPredictedQueuedBytes;
- private long predictedTimeToCountBackpressureMillis;
- private long predictedTimeToBytesBackpressureMillis;
- private int predictedPercentCount = 0;
- private int predictedPercentBytes = 0;
public String getId() {
return id;
@@ -130,20 +124,20 @@ public class ConnectionStatus implements Cloneable {
setBackPressureBytesThreshold(DataUnit.parseDataSize(backPressureDataSizeThreshold,
DataUnit.B).longValue());
}
- public long getBackPressureObjectThreshold() {
- return backPressureObjectThreshold;
+ public ConnectionStatusPredictions getPredictions() {
+ return predictions;
}
- public void setBackPressureObjectThreshold(long
backPressureObjectThreshold) {
- this.backPressureObjectThreshold = backPressureObjectThreshold;
+ public void setPredictions(ConnectionStatusPredictions predictions) {
+ this.predictions = predictions;
}
- public Boolean getPredictionsAvailable() {
- return predictionsAvailable;
+ public long getBackPressureObjectThreshold() {
+ return backPressureObjectThreshold;
}
- public void setPredictionsAvailable(Boolean predictionsAvailable) {
- this.predictionsAvailable = predictionsAvailable;
+ public void setBackPressureObjectThreshold(long
backPressureObjectThreshold) {
+ this.backPressureObjectThreshold = backPressureObjectThreshold;
}
public long getInputBytes() {
@@ -202,63 +196,6 @@ public class ConnectionStatus implements Cloneable {
this.backPressureBytesThreshold = backPressureBytesThreshold;
}
- public long getPredictionIntervalMillis() {
- return predictionIntervalMillis;
- }
-
- public void setPredictionIntervalMillis(long predictionIntervalMillis) {
- this.predictionIntervalMillis = predictionIntervalMillis;
- }
-
- public int getNextPredictedQueuedCount() {
- return nextPredictedQueuedCount;
- }
-
- public void setNextPredictedQueuedCount(int nextPredictedQueuedCount) {
- this.nextPredictedQueuedCount = nextPredictedQueuedCount;
- }
-
- public long getNextPredictedQueuedBytes() {
- return nextPredictedQueuedBytes;
- }
-
- public void setNextPredictedQueuedBytes(long nextPredictedQueuedBytes) {
- this.nextPredictedQueuedBytes = nextPredictedQueuedBytes;
- }
-
- public long getPredictedTimeToCountBackpressureMillis() {
- return predictedTimeToCountBackpressureMillis;
- }
-
- public void setPredictedTimeToCountBackpressureMillis(long
predictedTimeToCountBackpressureMillis) {
- this.predictedTimeToCountBackpressureMillis =
predictedTimeToCountBackpressureMillis;
- }
-
- public long getPredictedTimeToBytesBackpressureMillis() {
- return predictedTimeToBytesBackpressureMillis;
- }
-
- public void setPredictedTimeToBytesBackpressureMillis(long
predictedTimeToBytesBackpressureMillis) {
- this.predictedTimeToBytesBackpressureMillis =
predictedTimeToBytesBackpressureMillis;
- }
-
- public int getPredictedPercentCount() {
- return predictedPercentCount;
- }
-
- public void setPredictedPercentCount(int predictedPercentCount) {
- this.predictedPercentCount = predictedPercentCount;
- }
-
- public int getPredictedPercentBytes() {
- return predictedPercentBytes;
- }
-
- public void setPredictedPercentBytes(int predictedPercentBytes) {
- this.predictedPercentBytes = predictedPercentBytes;
- }
-
-
@Override
public ConnectionStatus clone() {
final ConnectionStatus clonedObj = new ConnectionStatus();
@@ -275,16 +212,15 @@ public class ConnectionStatus implements Cloneable {
clonedObj.sourceName = sourceName;
clonedObj.destinationId = destinationId;
clonedObj.destinationName = destinationName;
+
+ if (predictions != null) {
+ clonedObj.setPredictions(predictions.clone());
+ }
+
clonedObj.backPressureDataSizeThreshold =
backPressureDataSizeThreshold;
clonedObj.backPressureObjectThreshold = backPressureObjectThreshold;
clonedObj.maxQueuedBytes = maxQueuedBytes;
clonedObj.maxQueuedCount = maxQueuedCount;
- clonedObj.nextPredictedQueuedBytes = nextPredictedQueuedBytes;
- clonedObj.nextPredictedQueuedCount = nextPredictedQueuedCount;
- clonedObj.predictedTimeToBytesBackpressureMillis =
predictedTimeToBytesBackpressureMillis;
- clonedObj.predictedTimeToCountBackpressureMillis =
predictedTimeToCountBackpressureMillis;
- clonedObj.predictedPercentCount = predictedPercentCount;
- clonedObj.predictedPercentBytes = predictedPercentBytes;
return clonedObj;
}
@@ -309,8 +245,6 @@ public class ConnectionStatus implements Cloneable {
builder.append(backPressureDataSizeThreshold);
builder.append(", backPressureObjectThreshold=");
builder.append(backPressureObjectThreshold);
- builder.append(", predictionsAvailable=");
- builder.append(predictionsAvailable);
builder.append(", inputCount=");
builder.append(inputCount);
builder.append(", inputBytes=");
@@ -327,18 +261,6 @@ public class ConnectionStatus implements Cloneable {
builder.append(maxQueuedCount);
builder.append(", maxQueueBytes=");
builder.append(maxQueuedBytes);
- builder.append(", nextPredictedQueuedBytes=");
- builder.append(nextPredictedQueuedBytes);
- builder.append(", nextPredictedQueuedCount=");
- builder.append(nextPredictedQueuedCount);
- builder.append(", predictedTimeToBytesBackpressureMillis=");
- builder.append(predictedTimeToBytesBackpressureMillis);
- builder.append(", predictedTimeToCountBackpressureMillis=");
- builder.append(predictedTimeToCountBackpressureMillis);
- builder.append(", predictedPercentCount=");
- builder.append(predictedPercentCount);
- builder.append(", predictedPercentBytes=");
- builder.append(predictedPercentBytes);
builder.append("]");
return builder.toString();
}
diff --git
a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusPredictions.java
b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusPredictions.java
new file mode 100644
index 0000000..b2263c6
--- /dev/null
+++
b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusPredictions.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+import org.apache.nifi.processor.DataUnit;
+
+/**
+ */
+public class ConnectionStatusPredictions implements Cloneable {
+ private long predictionIntervalMillis;
+ private int nextPredictedQueuedCount;
+ private long nextPredictedQueuedBytes;
+ private long predictedTimeToCountBackpressureMillis;
+ private long predictedTimeToBytesBackpressureMillis;
+ private int predictedPercentCount = 0;
+ private int predictedPercentBytes = 0;
+
+ public long getPredictionIntervalMillis() {
+ return predictionIntervalMillis;
+ }
+
+ public void setPredictionIntervalMillis(long predictionIntervalMillis) {
+ this.predictionIntervalMillis = predictionIntervalMillis;
+ }
+
+ public int getNextPredictedQueuedCount() {
+ return nextPredictedQueuedCount;
+ }
+
+ public void setNextPredictedQueuedCount(int nextPredictedQueuedCount) {
+ this.nextPredictedQueuedCount = nextPredictedQueuedCount;
+ }
+
+ public long getNextPredictedQueuedBytes() {
+ return nextPredictedQueuedBytes;
+ }
+
+ public void setNextPredictedQueuedBytes(long nextPredictedQueuedBytes) {
+ this.nextPredictedQueuedBytes = nextPredictedQueuedBytes;
+ }
+
+ public long getPredictedTimeToCountBackpressureMillis() {
+ return predictedTimeToCountBackpressureMillis;
+ }
+
+ public void setPredictedTimeToCountBackpressureMillis(long
predictedTimeToCountBackpressureMillis) {
+ this.predictedTimeToCountBackpressureMillis =
predictedTimeToCountBackpressureMillis;
+ }
+
+ public long getPredictedTimeToBytesBackpressureMillis() {
+ return predictedTimeToBytesBackpressureMillis;
+ }
+
+ public void setPredictedTimeToBytesBackpressureMillis(long
predictedTimeToBytesBackpressureMillis) {
+ this.predictedTimeToBytesBackpressureMillis =
predictedTimeToBytesBackpressureMillis;
+ }
+
+ public int getPredictedPercentCount() {
+ return predictedPercentCount;
+ }
+
+ public void setPredictedPercentCount(int predictedPercentCount) {
+ this.predictedPercentCount = predictedPercentCount;
+ }
+
+ public int getPredictedPercentBytes() {
+ return predictedPercentBytes;
+ }
+
+ public void setPredictedPercentBytes(int predictedPercentBytes) {
+ this.predictedPercentBytes = predictedPercentBytes;
+ }
+
+ @Override
+ public ConnectionStatusPredictions clone() {
+ final ConnectionStatusPredictions clonedObj = new
ConnectionStatusPredictions();
+ clonedObj.nextPredictedQueuedBytes = nextPredictedQueuedBytes;
+ clonedObj.nextPredictedQueuedCount = nextPredictedQueuedCount;
+ clonedObj.predictedTimeToBytesBackpressureMillis =
predictedTimeToBytesBackpressureMillis;
+ clonedObj.predictedTimeToCountBackpressureMillis =
predictedTimeToCountBackpressureMillis;
+ clonedObj.predictedPercentCount = predictedPercentCount;
+ clonedObj.predictedPercentBytes = predictedPercentBytes;
+ return clonedObj;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("ConnectionStatusPredictions [id=");
+ builder.append(", nextPredictedQueuedBytes=");
+ builder.append(nextPredictedQueuedBytes);
+ builder.append(", nextPredictedQueuedCount=");
+ builder.append(nextPredictedQueuedCount);
+ builder.append(", predictedTimeToBytesBackpressureMillis=");
+ builder.append(predictedTimeToBytesBackpressureMillis);
+ builder.append(", predictedTimeToCountBackpressureMillis=");
+ builder.append(predictedTimeToCountBackpressureMillis);
+ builder.append(", predictedPercentCount=");
+ builder.append(predictedPercentCount);
+ builder.append(", predictedPercentBytes=");
+ builder.append(predictedPercentBytes);
+ builder.append("]");
+ return builder.toString();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusPredictionsSnapshotDTO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusPredictionsSnapshotDTO.java
new file mode 100644
index 0000000..7ee2896
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusPredictionsSnapshotDTO.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto.status;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * DTO for serializing the status predictions of a connection.
+ */
+@XmlType(name = "connectionStatusSnapshot")
+public class ConnectionStatusPredictionsSnapshotDTO implements Cloneable {
+ private Long predictedMillisUntilCountBackpressure = 0L;
+ private Long predictedMillisUntilBytesBackpressure = 0L;
+ private Integer predictionIntervalSeconds;
+ private Integer predictedCountAtNextInterval = 0;
+ private Long predictedBytesAtNextInterval = 0L;
+ private Integer predictedPercentCount;
+ private Integer predictedPercentBytes;
+
+ @ApiModelProperty("The predicted number of milliseconds before the
connection will have backpressure applied, based on the queued count.")
+ public Long getPredictedMillisUntilCountBackpressure() {
+ return predictedMillisUntilCountBackpressure;
+ }
+
+ public void setPredictedMillisUntilCountBackpressure(Long
predictedMillisUntilCountBackpressure) {
+ this.predictedMillisUntilCountBackpressure =
predictedMillisUntilCountBackpressure;
+ }
+
+ @ApiModelProperty("The predicted number of milliseconds before the
connection will have backpressure applied, based on the total number of bytes
in the queue.")
+ public Long getPredictedMillisUntilBytesBackpressure() {
+ return predictedMillisUntilBytesBackpressure;
+ }
+
+ public void setPredictedMillisUntilBytesBackpressure(Long
predictedMillisUntilBytesBackpressure) {
+ this.predictedMillisUntilBytesBackpressure =
predictedMillisUntilBytesBackpressure;
+ }
+
+ @ApiModelProperty("The predicted number of queued objects at the next
configured interval.")
+ public Integer getPredictedCountAtNextInterval() {
+ return predictedCountAtNextInterval;
+ }
+
+ public void setPredictedCountAtNextInterval(Integer
predictedCountAtNextInterval) {
+ this.predictedCountAtNextInterval = predictedCountAtNextInterval;
+ }
+
+ @ApiModelProperty("The configured interval (in seconds) for predicting
connection queue count and size (and percent usage).")
+ public Integer getPredictionIntervalSeconds() {
+ return predictionIntervalSeconds;
+ }
+
+ public void setPredictionIntervalSeconds(Integer
predictionIntervalSeconds) {
+ this.predictionIntervalSeconds = predictionIntervalSeconds;
+ }
+
+ @ApiModelProperty("The predicted total number of bytes in the queue at the
next configured interval.")
+ public Long getPredictedBytesAtNextInterval() {
+ return predictedBytesAtNextInterval;
+ }
+
+ public void setPredictedBytesAtNextInterval(Long
predictedBytesAtNextInterval) {
+ this.predictedBytesAtNextInterval = predictedBytesAtNextInterval;
+ }
+
+ @ApiModelProperty("Predicted connection percent use regarding queued flow
files count and backpressure threshold if configured.")
+ public Integer getPredictedPercentCount() {
+ return predictedPercentCount;
+ }
+
+ public void setPredictedPercentCount(Integer predictedPercentCount) {
+ this.predictedPercentCount = predictedPercentCount;
+ }
+
+ @ApiModelProperty("Predicted connection percent use regarding queued flow
files size and backpressure threshold if configured.")
+ public Integer getPredictedPercentBytes() {
+ return predictedPercentBytes;
+ }
+
+ public void setPredictedPercentBytes(Integer predictedPercentBytes) {
+ this.predictedPercentBytes = predictedPercentBytes;
+ }
+
+ @Override
+ public ConnectionStatusPredictionsSnapshotDTO clone() {
+ final ConnectionStatusPredictionsSnapshotDTO other = new
ConnectionStatusPredictionsSnapshotDTO();
+
other.setPredictedMillisUntilCountBackpressure(getPredictedMillisUntilCountBackpressure());
+
other.setPredictedMillisUntilBytesBackpressure(getPredictedMillisUntilBytesBackpressure());
+ other.setPredictionIntervalSeconds(getPredictionIntervalSeconds());
+
other.setPredictedCountAtNextInterval(getPredictedCountAtNextInterval());
+
other.setPredictedBytesAtNextInterval(getPredictedBytesAtNextInterval());
+ other.setPredictedPercentBytes(getPredictedPercentBytes());
+ other.setPredictedPercentCount(getPredictedPercentCount());
+
+ return other;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
index b4e592a..e8c9df6 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
@@ -34,7 +34,7 @@ public class ConnectionStatusSnapshotDTO implements Cloneable
{
private String sourceName;
private String destinationId;
private String destinationName;
- private Boolean predictionsAvailable;
+ private ConnectionStatusPredictionsSnapshotDTO predictions;
private Integer flowFilesIn = 0;
private Long bytesIn = 0L;
private String input;
@@ -48,13 +48,6 @@ public class ConnectionStatusSnapshotDTO implements
Cloneable {
private String queuedCount;
private Integer percentUseCount;
private Integer percentUseBytes;
- private Long predictedMillisUntilCountBackpressure = 0L;
- private Long predictedMillisUntilBytesBackpressure = 0L;
- private Integer predictionIntervalSeconds;
- private Integer predictedCountAtNextInterval = 0;
- private Long predictedBytesAtNextInterval = 0L;
- private Integer predictedPercentCount;
- private Integer predictedPercentBytes;
/* getters / setters */
/**
@@ -189,15 +182,15 @@ public class ConnectionStatusSnapshotDTO implements
Cloneable {
}
/**
- * @return indicator showing if predictions available for this connection
+ * @return predictions for this connection
*/
- @ApiModelProperty("Indicator showing if predictions available for this
connection")
- public Boolean getPredictionsAvailable() {
- return predictionsAvailable;
+ @ApiModelProperty("Predictions, if available, for this connection (null if
not available)")
+ public ConnectionStatusPredictionsSnapshotDTO getPredictions() {
+ return predictions;
}
- public void setPredictionsAvailable(Boolean predictionsAvailable) {
- this.predictionsAvailable = predictionsAvailable;
+ public void setPredictions(ConnectionStatusPredictionsSnapshotDTO
predictions) {
+ this.predictions = predictions;
}
/**
@@ -290,69 +283,6 @@ public class ConnectionStatusSnapshotDTO implements
Cloneable {
this.percentUseBytes = percentUseBytes;
}
- @ApiModelProperty("The predicted number of milliseconds before the
connection will have backpressure applied, based on the queued count.")
- public Long getPredictedMillisUntilCountBackpressure() {
- return predictedMillisUntilCountBackpressure;
- }
-
- public void setPredictedMillisUntilCountBackpressure(Long
predictedMillisUntilCountBackpressure) {
- this.predictedMillisUntilCountBackpressure =
predictedMillisUntilCountBackpressure;
- }
-
- @ApiModelProperty("The predicted number of milliseconds before the
connection will have backpressure applied, based on the total number of bytes
in the queue.")
- public Long getPredictedMillisUntilBytesBackpressure() {
- return predictedMillisUntilBytesBackpressure;
- }
-
- public void setPredictedMillisUntilBytesBackpressure(Long
predictedMillisUntilBytesBackpressure) {
- this.predictedMillisUntilBytesBackpressure =
predictedMillisUntilBytesBackpressure;
- }
-
- @ApiModelProperty("The predicted number of queued objects at the next
configured interval.")
- public Integer getPredictedCountAtNextInterval() {
- return predictedCountAtNextInterval;
- }
-
- public void setPredictedCountAtNextInterval(Integer
predictedCountAtNextInterval) {
- this.predictedCountAtNextInterval = predictedCountAtNextInterval;
- }
-
- @ApiModelProperty("The configured interval (in seconds) for predicting
connection queue count and size (and percent usage).")
- public Integer getPredictionIntervalSeconds() {
- return predictionIntervalSeconds;
- }
-
- public void setPredictionIntervalSeconds(Integer
predictionIntervalSeconds) {
- this.predictionIntervalSeconds = predictionIntervalSeconds;
- }
-
- @ApiModelProperty("The predicted total number of bytes in the queue at the
next configured interval.")
- public Long getPredictedBytesAtNextInterval() {
- return predictedBytesAtNextInterval;
- }
-
- public void setPredictedBytesAtNextInterval(Long
predictedBytesAtNextInterval) {
- this.predictedBytesAtNextInterval = predictedBytesAtNextInterval;
- }
-
- @ApiModelProperty("Predicted connection percent use regarding queued flow
files count and backpressure threshold if configured.")
- public Integer getPredictedPercentCount() {
- return predictedPercentCount;
- }
-
- public void setPredictedPercentCount(Integer predictedPercentCount) {
- this.predictedPercentCount = predictedPercentCount;
- }
-
- @ApiModelProperty("Predicted connection percent use regarding queued flow
files size and backpressure threshold if configured.")
- public Integer getPredictedPercentBytes() {
- return predictedPercentBytes;
- }
-
- public void setPredictedPercentBytes(Integer predictedPercentBytes) {
- this.predictedPercentBytes = predictedPercentBytes;
- }
-
@Override
public ConnectionStatusSnapshotDTO clone() {
final ConnectionStatusSnapshotDTO other = new
ConnectionStatusSnapshotDTO();
@@ -363,7 +293,11 @@ public class ConnectionStatusSnapshotDTO implements
Cloneable {
other.setName(getName());
other.setSourceId(getSourceId());
other.setSourceName(getSourceName());
- other.setPredictionsAvailable(getPredictionsAvailable());
+
+ if (predictions != null) {
+ other.setPredictions(predictions.clone());
+ }
+
other.setFlowFilesIn(getFlowFilesIn());
other.setBytesIn(getBytesIn());
other.setInput(getInput());
@@ -377,13 +311,6 @@ public class ConnectionStatusSnapshotDTO implements
Cloneable {
other.setQueuedSize(getQueuedSize());
other.setPercentUseBytes(getPercentUseBytes());
other.setPercentUseCount(getPercentUseCount());
-
other.setPredictedMillisUntilCountBackpressure(getPredictedMillisUntilCountBackpressure());
-
other.setPredictedMillisUntilBytesBackpressure(getPredictedMillisUntilBytesBackpressure());
- other.setPredictionIntervalSeconds(getPredictionIntervalSeconds());
-
other.setPredictedCountAtNextInterval(getPredictedCountAtNextInterval());
-
other.setPredictedBytesAtNextInterval(getPredictedBytesAtNextInterval());
- other.setPredictedPercentBytes(getPredictedPercentBytes());
- other.setPredictedPercentCount(getPredictedPercentCount());
return other;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
index 5bb09bc..05c65b7 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -36,22 +36,7 @@ import
org.apache.nifi.web.api.dto.diagnostics.JVMControllerDiagnosticsSnapshotD
import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMFlowDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMSystemDiagnosticsSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
-import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO;
-import
org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.PortStatusDTO;
-import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
-import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.*;
import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
@@ -480,7 +465,6 @@ public class StatusMerger {
target.setSourceName(toMerge.getSourceName());
target.setDestinationId(toMerge.getDestinationId());
target.setDestinationName(toMerge.getDestinationName());
- target.setPredictionsAvailable(toMerge.getPredictionsAvailable());
}
target.setFlowFilesIn(target.getFlowFilesIn() +
toMerge.getFlowFilesIn());
@@ -502,31 +486,37 @@ public class StatusMerger {
}
// Merge predicted values (minimum time to backpressure, maximum
percent at next interval
+ ConnectionStatusPredictionsSnapshotDTO targetPredictions =
target.getPredictions();
+ ConnectionStatusPredictionsSnapshotDTO toMergePredictions =
toMerge.getPredictions();
+
+ if (targetPredictions == null) {
+ target.setPredictions(toMergePredictions);
+ } else if (toMergePredictions != null) {
+ if (targetPredictions.getPredictionIntervalSeconds() == null) {
+
targetPredictions.setPredictionIntervalSeconds(toMergePredictions.getPredictionIntervalSeconds());
+ }
- if (target.getPredictionIntervalSeconds() == null) {
-
target.setPredictionIntervalSeconds(toMerge.getPredictionIntervalSeconds());
- }
-
- if (target.getPredictedMillisUntilBytesBackpressure() == null) {
-
target.setPredictedMillisUntilBytesBackpressure(toMerge.getPredictedMillisUntilBytesBackpressure());
- } else if (toMerge.getPredictedMillisUntilBytesBackpressure() != null)
{
-
target.setPredictedMillisUntilBytesBackpressure(Math.min(target.getPredictedMillisUntilBytesBackpressure(),
toMerge.getPredictedMillisUntilBytesBackpressure()));
- }
- if (target.getPredictedMillisUntilCountBackpressure() == null) {
-
target.setPredictedMillisUntilCountBackpressure(toMerge.getPredictedMillisUntilCountBackpressure());
- } else if (toMerge.getPredictedMillisUntilCountBackpressure() != null)
{
-
target.setPredictedMillisUntilCountBackpressure(Math.min(target.getPredictedMillisUntilCountBackpressure(),
toMerge.getPredictedMillisUntilCountBackpressure()));
- }
+ if (targetPredictions.getPredictedMillisUntilBytesBackpressure()
== null) {
+
targetPredictions.setPredictedMillisUntilBytesBackpressure(toMergePredictions.getPredictedMillisUntilBytesBackpressure());
+ } else if
(toMergePredictions.getPredictedMillisUntilBytesBackpressure() != null) {
+
targetPredictions.setPredictedMillisUntilBytesBackpressure(Math.min(targetPredictions.getPredictedMillisUntilBytesBackpressure(),
toMergePredictions.getPredictedMillisUntilBytesBackpressure()));
+ }
+ if (targetPredictions.getPredictedMillisUntilCountBackpressure()
== null) {
+
targetPredictions.setPredictedMillisUntilCountBackpressure(toMergePredictions.getPredictedMillisUntilCountBackpressure());
+ } else if
(toMergePredictions.getPredictedMillisUntilCountBackpressure() != null) {
+
targetPredictions.setPredictedMillisUntilCountBackpressure(Math.min(targetPredictions.getPredictedMillisUntilCountBackpressure(),
toMergePredictions.getPredictedMillisUntilCountBackpressure()));
+ }
- if (target.getPredictedPercentBytes() == null) {
-
target.setPredictedPercentBytes(toMerge.getPredictedPercentBytes());
- } else if (toMerge.getPercentUseBytes() != null) {
-
target.setPredictedPercentBytes(Math.max(target.getPredictedPercentBytes(),
toMerge.getPredictedPercentBytes()));
- }
- if (target.getPredictedPercentCount() == null) {
-
target.setPredictedPercentCount(toMerge.getPredictedPercentCount());
- } else if (toMerge.getPredictedPercentCount() != null) {
-
target.setPredictedPercentCount(Math.max(target.getPredictedPercentCount(),
toMerge.getPredictedPercentCount()));
+ if (targetPredictions.getPredictedPercentBytes() == null) {
+
targetPredictions.setPredictedPercentBytes(toMergePredictions.getPredictedPercentBytes());
+ } else if (toMerge.getPercentUseBytes() != null) {
+
targetPredictions.setPredictedPercentBytes(Math.max(targetPredictions.getPredictedPercentBytes(),
toMergePredictions.getPredictedPercentBytes()));
+ }
+ if (targetPredictions.getPredictedPercentCount() == null) {
+
targetPredictions.setPredictedPercentCount(toMergePredictions.getPredictedPercentCount());
+ } else if (toMergePredictions.getPredictedPercentCount() != null) {
+
targetPredictions.setPredictedPercentCount(Math.max(targetPredictions.getPredictedPercentCount(),
toMergePredictions.getPredictedPercentCount()));
+ }
}
updatePrettyPrintedFields(target);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index f350a43..4432620 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -52,6 +52,7 @@ import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
import org.apache.nifi.groups.ProcessGroup;
@@ -346,18 +347,19 @@ public class StandardEventAccess implements
UserAwareEventAccess {
if (statusAnalyticsEngine != null) {
StatusAnalytics statusAnalytics =
statusAnalyticsEngine.getStatusAnalytics(conn.getIdentifier());
if (statusAnalytics != null) {
- Map<String,Long> predictions =
statusAnalytics.getPredictions();
- connStatus.setPredictionsAvailable(true);
-
connStatus.setPredictedTimeToBytesBackpressureMillis(predictions.get("timeToBytesBackpressureMillis"));
-
connStatus.setPredictedTimeToCountBackpressureMillis(predictions.get("timeToCountBackpressureMillis"));
-
connStatus.setNextPredictedQueuedBytes(predictions.get("nextIntervalBytes"));
-
connStatus.setNextPredictedQueuedCount(predictions.get("nextIntervalCount").intValue());
-
connStatus.setPredictedPercentCount(predictions.get("nextIntervalPercentageUseCount").intValue());
-
connStatus.setPredictedPercentBytes(predictions.get("nextIntervalPercentageUseBytes").intValue());
-
connStatus.setPredictionIntervalMillis(predictions.get("intervalTimeMillis"));
+ Map<String,Long> predictionValues =
statusAnalytics.getPredictions();
+ ConnectionStatusPredictions predictions = new
ConnectionStatusPredictions();
+ connStatus.setPredictions(predictions);
+
predictions.setPredictedTimeToBytesBackpressureMillis(predictionValues.get("timeToBytesBackpressureMillis"));
+
predictions.setPredictedTimeToCountBackpressureMillis(predictionValues.get("timeToCountBackpressureMillis"));
+
predictions.setNextPredictedQueuedBytes(predictionValues.get("nextIntervalBytes"));
+
predictions.setNextPredictedQueuedCount(predictionValues.get("nextIntervalCount").intValue());
+
predictions.setPredictedPercentCount(predictionValues.get("nextIntervalPercentageUseCount").intValue());
+
predictions.setPredictedPercentBytes(predictionValues.get("nextIntervalPercentageUseBytes").intValue());
+
predictions.setPredictionIntervalMillis(predictionValues.get("intervalTimeMillis"));
}
}else{
- connStatus.setPredictionsAvailable(false);
+ connStatus.setPredictions(null);
}
if (isConnectionAuthorized) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 0c4edc7..d081dcb 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -134,6 +134,7 @@ import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
@@ -229,18 +230,7 @@ import
org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO.LineageR
import org.apache.nifi.web.api.dto.provenance.lineage.LineageResultsDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceLinkDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceNodeDTO;
-import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
-import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
-import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.PortStatusDTO;
-import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
-import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.*;
import org.apache.nifi.web.api.entity.AccessPolicyEntity;
import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
@@ -1170,7 +1160,6 @@ public final class DtoFactory {
snapshot.setName(connectionStatus.getName());
snapshot.setSourceName(connectionStatus.getSourceName());
snapshot.setDestinationName(connectionStatus.getDestinationName());
-
snapshot.setPredictionsAvailable(connectionStatus.getPredictionsAvailable());
snapshot.setFlowFilesQueued(connectionStatus.getQueuedCount());
snapshot.setBytesQueued(connectionStatus.getQueuedBytes());
@@ -1181,25 +1170,37 @@ public final class DtoFactory {
snapshot.setFlowFilesOut(connectionStatus.getOutputCount());
snapshot.setBytesOut(connectionStatus.getOutputBytes());
+ ConnectionStatusPredictions predictions =
connectionStatus.getPredictions();
+ ConnectionStatusPredictionsSnapshotDTO predictionsDTO = null;
+ if (predictions != null) {
+ predictionsDTO = new ConnectionStatusPredictionsSnapshotDTO();
+ }
+
if (connectionStatus.getBackPressureObjectThreshold() > 0) {
snapshot.setPercentUseCount(Math.min(100,
StatusMerger.getUtilization(connectionStatus.getQueuedCount(),
connectionStatus.getBackPressureObjectThreshold())));
- snapshot.setPredictionIntervalSeconds(((Long)
(connectionStatus.getPredictionIntervalMillis() / 1000L)).intValue());
-
snapshot.setPredictedMillisUntilCountBackpressure(connectionStatus.getPredictedTimeToCountBackpressureMillis());
-
snapshot.setPredictedCountAtNextInterval(connectionStatus.getNextPredictedQueuedCount());
-
snapshot.setPredictedPercentCount(connectionStatus.getPredictedPercentCount());
-
snapshot.setPredictedPercentBytes(connectionStatus.getPredictedPercentBytes());
-
snapshot.setPredictionIntervalSeconds(((Long)(connectionStatus.getPredictionIntervalMillis()
/ 1000L)).intValue());
+ if (predictionsDTO != null) {
+ snapshot.setPredictions(predictionsDTO);
+ predictionsDTO.setPredictionIntervalSeconds(((Long)
(predictions.getPredictionIntervalMillis() / 1000L)).intValue());
+
predictionsDTO.setPredictedMillisUntilCountBackpressure(predictions.getPredictedTimeToCountBackpressureMillis());
+
predictionsDTO.setPredictedCountAtNextInterval(predictions.getNextPredictedQueuedCount());
+
predictionsDTO.setPredictedPercentCount(predictions.getPredictedPercentCount());
+
predictionsDTO.setPredictedPercentBytes(predictions.getPredictedPercentBytes());
+ predictionsDTO.setPredictionIntervalSeconds(((Long)
(predictions.getPredictionIntervalMillis() / 1000L)).intValue());
+ }
}
if (connectionStatus.getBackPressureBytesThreshold() > 0) {
snapshot.setPercentUseBytes(Math.min(100,
StatusMerger.getUtilization(connectionStatus.getQueuedBytes(),
connectionStatus.getBackPressureBytesThreshold())));
- snapshot.setPredictionIntervalSeconds(((Long)
(connectionStatus.getPredictionIntervalMillis() / 1000L)).intValue());
-
snapshot.setPredictedMillisUntilBytesBackpressure(connectionStatus.getPredictedTimeToBytesBackpressureMillis());
-
snapshot.setPredictedBytesAtNextInterval(connectionStatus.getNextPredictedQueuedBytes());
-
snapshot.setPredictedPercentCount(connectionStatus.getPredictedPercentCount());
-
snapshot.setPredictedPercentBytes(connectionStatus.getPredictedPercentBytes());
-
snapshot.setPredictionIntervalSeconds(((Long)(connectionStatus.getPredictionIntervalMillis()
/ 1000L)).intValue());
+ if (predictionsDTO != null) {
+ snapshot.setPredictions(predictionsDTO);
+ predictionsDTO.setPredictionIntervalSeconds(((Long)
(predictions.getPredictionIntervalMillis() / 1000L)).intValue());
+
predictionsDTO.setPredictedMillisUntilBytesBackpressure(predictions.getPredictedTimeToBytesBackpressureMillis());
+
predictionsDTO.setPredictedBytesAtNextInterval(predictions.getNextPredictedQueuedBytes());
+
predictionsDTO.setPredictedPercentCount(predictions.getPredictedPercentCount());
+
predictionsDTO.setPredictedPercentBytes(predictions.getPredictedPercentBytes());
+ predictionsDTO.setPredictionIntervalSeconds(((Long)
(predictions.getPredictionIntervalMillis() / 1000L)).intValue());
+ }
}
StatusMerger.updatePrettyPrintedFields(snapshot);