nandorsoma commented on code in PR #6225:
URL: https://github.com/apache/nifi/pull/6225#discussion_r957824765
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -31,89 +31,89 @@
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_500;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
public abstract class AbstractMQTTProcessor extends
AbstractSessionFactoryProcessor {
- public static int DISCONNECT_TIMEOUT = 5000;
+ private static final String DEFAULT_SESSION_EXPIRY_INTERVAL = "24 hrs";
protected ComponentLog logger;
- protected IMqttClient mqttClient;
- protected volatile String broker;
- protected volatile String brokerUri;
- protected volatile String clientID;
- protected MqttConnectOptions connOpts;
- protected MemoryPersistence persistence = new MemoryPersistence();
- public ProcessSessionFactory processSessionFactory;
+ protected MqttClientProperties clientProperties;
- public static final Validator QOS_VALIDATOR = new Validator() {
+ protected MqttClientFactory mqttClientFactory = new MqttClientFactory();
+ protected MqttClient mqttClient;
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- Integer inputInt = Integer.parseInt(input);
- if (inputInt < 0 || inputInt > 2) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must
be an integer between 0 and 2").build();
- }
- return new
ValidationResult.Builder().subject(subject).valid(true).build();
+ public ProcessSessionFactory processSessionFactory;
+
+ public static final Validator QOS_VALIDATOR = (subject, input, context) ->
{
+ Integer inputInt = Integer.parseInt(input);
+ if (inputInt < 0 || inputInt > 2) {
+ return new
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must
be an integer between 0 and 2").build();
}
+ return new
ValidationResult.Builder().subject(subject).valid(true).build();
};
- public static final Validator BROKER_VALIDATOR = new Validator() {
-
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- try{
- URI brokerURI = new URI(input);
- if (!"".equals(brokerURI.getPath())) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("the
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
- }
- if (!("tcp".equals(brokerURI.getScheme()) ||
"ssl".equals(brokerURI.getScheme()) || "ws".equals(brokerURI.getScheme()) ||
"wss".equals(brokerURI.getScheme()))) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("only the
'tcp', 'ssl', 'ws' and 'wss' schemes are supported.").build();
- }
- } catch (URISyntaxException e) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("it is not
valid URI syntax.").build();
+ public static final Validator BROKER_VALIDATOR = (subject, input, context)
-> {
+ try {
+ URI brokerURI = new URI(input);
+ if (!EMPTY.equals(brokerURI.getPath())) {
+ return new
ValidationResult.Builder().subject(subject).valid(false).explanation("the
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
}
- return new
ValidationResult.Builder().subject(subject).valid(true).build();
+ if (!isValidEnumIgnoreCase(MqttProtocolScheme.class,
brokerURI.getScheme())) {
+ return new
ValidationResult.Builder().subject(subject).valid(false)
+ .explanation("invalid scheme! supported schemes are: "
+ MqttProtocolScheme.getValuesAsString(", ")).build();
Review Comment:
Double colon feels weird in the same sentence. What about comma?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]