exceptionfactory commented on code in PR #7053:
URL: https://github.com/apache/nifi/pull/7053#discussion_r1140477367
##########
nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java:
##########
@@ -102,23 +100,17 @@ public void send(final
org.apache.nifi.processors.grpc.FlowFileRequest request,
final ProcessSession session = sessionFactory.createSession();
// if there's no space available, reject the request.
- final long n = filesReceived.getAndIncrement() %
FILES_BEFORE_CHECKING_DESTINATION_SPACE;
- if (n == 0 || !spaceAvailable.get()) {
- if (context.getAvailableRelationships().isEmpty()) {
- spaceAvailable.set(false);
- final String message = "Received request from " + remoteHost +
" but no space available; Indicating Service Unavailable";
- if (logger.isDebugEnabled()) {
- logger.debug(message);
- }
- final FlowFileReply reply =
replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR)
- .setBody(message)
- .build();
- responseObserver.onNext(reply);
- responseObserver.onCompleted();
- return;
- } else {
- spaceAvailable.set(true);
+ if (backpressureChecker.isBackpressure()) {
+ final String message = "Received request from " + remoteHost + "
but no space available; Indicating Service Unavailable";
+ if (logger.isDebugEnabled()) {
+ logger.debug(message);
}
Review Comment:
The conditional `logger.isDebugEnabled()` can be removed since the message
is already computed. There is no cost savings to checking whether debug is
enabled at this point.
##########
nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/pom.xml:
##########
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-grpc-bundle</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-grpc-common</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-security-utils-api</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
Review Comment:
Is this dependency used?
##########
nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/GRPCConstants.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.grpc;
+
+public class GRPCConstants {
Review Comment:
This class name seems a bit too generic. Recommend renaming it
GRPCAttributeNames, rather than making it too generalized.
##########
nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/util/BackpressureChecker.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.grpc.util;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class BackpressureChecker {
+
+ public static final int RECHECK_THRESHOLD = 5;
+
+ private final ProcessContext processContext;
+ private final Set<Relationship> relationships;
+ private final AtomicLong requestCount = new AtomicLong(0L);
+ private final AtomicBoolean backPressure = new AtomicBoolean(false);
+
+ public BackpressureChecker(ProcessContext processContext,
Set<Relationship> relationships) {
+ this.processContext = processContext;
+ this.relationships = relationships;
+ }
+
+ public boolean isBackpressure() {
Review Comment:
Should this be renamed to `isBackpressured`?
```suggestion
public boolean isBackpressured() {
```
##########
nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java:
##########
@@ -114,7 +114,7 @@ public <I, O> ServerCall.Listener<I> interceptCall(
}
}
} catch (final SSLPeerUnverifiedException e) {
- logger.debug("skipping DN authorization for request from {}.",
new Object[] {clientIp}, e);
+ logger.debug("Skipping DN authorization for request from {}.",
new Object[] {clientIp}, e);
Review Comment:
The `Object[]` wrapper can be removed.
```suggestion
logger.debug("Skipping DN authorization for request from
{}", clientIp, e);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]