ferencerdei commented on code in PR #7344:
URL: https://github.com/apache/nifi/pull/7344#discussion_r1263350692
##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java:
##########
@@ -93,25 +79,47 @@ public enum MiNiFiProperties {
C2_REST_CALL_TIMEOUT("c2.rest.callTimeout", "10 sec", false, true,
TIME_PERIOD_VALIDATOR),
C2_MAX_IDLE_CONNECTIONS("c2.rest.maxIdleConnections", "5", false, true,
NON_NEGATIVE_INTEGER_VALIDATOR),
C2_KEEP_ALIVE_DURATION("c2.rest.keepAliveDuration", "5 min", false, true,
TIME_PERIOD_VALIDATOR),
- C2_AGENT_HEARTBEAT_PERIOD("c2.agent.heartbeat.period", "1000", false,
true, LONG_VALIDATOR),
- C2_AGENT_CLASS("c2.agent.class", "", false, true, VALID),
+ C2_REST_HTTP_HEADERS("c2.rest.http.headers", "", false, true, VALID),
Review Comment:
What do you think about setting the default value to "Accept:text/json" ?
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider
bootstrapFileProvider) {
* @throws IOException throws IOException if any of the configuration file
read fails
*/
public List<String> getMiNiFiExecCommand(int listenPort, File workingDir)
throws IOException {
- Properties props = bootstrapFileProvider.getBootstrapProperties();
- File confDir = getFile(props.getProperty(CONF_DIR_KEY,
DEFAULT_CONF_DIR).trim(), workingDir);
- File libDir = getFile(props.getProperty("lib.dir",
DEFAULT_LIB_DIR).trim(), workingDir);
+ Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
+
+ File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY,
DEFAULT_CONF_DIR).trim(), workingDir);
+ File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY,
DEFAULT_LIB_DIR).trim(), workingDir);
+
String minifiLogDir = System.getProperty(LOG_DIR,
DEFAULT_LOG_DIR).trim();
String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME,
DEFAULT_APP_LOG_FILE_NAME).trim();
String minifiAppLogFileExtension =
System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
String minifiBootstrapLogFileName =
System.getProperty(BOOTSTRAP_LOG_FILE_NAME,
DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
String minifiBootstrapLogFileExtension =
System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION,
DEFAULT_LOG_FILE_EXTENSION).trim();
- List<String> cmd = new ArrayList<>();
- cmd.add(getJavaCommand(props));
- cmd.add("-classpath");
- cmd.add(buildClassPath(props, confDir, libDir));
- cmd.addAll(getJavaAdditionalArgs(props));
- cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props,
confDir));
- cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
- cmd.add("-Dapp=MiNiFi");
- cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
- cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
- cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" +
minifiAppLogFileExtension);
- cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" +
minifiBootstrapLogFileName);
- cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" +
minifiBootstrapLogFileExtension);
- cmd.add("org.apache.nifi.minifi.MiNiFi");
-
- return cmd;
+ return List.of(
+ getJavaCommand(bootstrapProperties),
+ "-classpath",
+ buildClassPath(confDir, libDir),
+ getJavaAdditionalArgs(bootstrapProperties),
Review Comment:
The string concatenation break the processbuilder's behaviour, each argument
need to be a separate entry in the list like before. (addAll was used
previously)
##########
c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java:
##########
@@ -186,6 +202,9 @@ public long getKeepAliveDuration() {
*/
public static class Builder {
+ private static final String HTTP_HEADERS_SEPARATOR = ",";
Review Comment:
It's not uncommon that a header value contains comma or colon. I'm not sure
if we'd need to add escaping for these or simply document that it's not
possible to use that atm.
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -121,10 +122,18 @@ private void start() throws IOException,
InterruptedException {
Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
- initConfigFiles(bootstrapProperties, confDir);
- Process process = startMiNiFi();
+ DEFAULT_LOGGER.debug("Generating minifi.properties from
bootstrap.conf");
Review Comment:
Can we move the logging to initConfigFile method? And maybe we can change
the method name as well to generateMinifiProperties if it is really generating
only that.
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -74,12 +71,14 @@ public class StartRunner implements CommandRunner {
private volatile ShutdownHook shutdownHook;
private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
private final ConfigurationChangeListener configurationChangeListener;
+ private final MiNiFiPropertiesGenerator miNiFiPropertiesGenerator;
private int listenPort;
public StartRunner(CurrentPortProvider currentPortProvider,
BootstrapFileProvider bootstrapFileProvider,
- PeriodicStatusReporterManager periodicStatusReporterManager,
MiNiFiStdLogHandler miNiFiStdLogHandler, MiNiFiParameters miNiFiParameters,
File bootstrapConfigFile,
- RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider
miNiFiExecCommandProvider, ConfigurationChangeListener
configurationChangeListener) {
+ PeriodicStatusReporterManager
periodicStatusReporterManager, MiNiFiStdLogHandler miNiFiStdLogHandler,
MiNiFiParameters miNiFiParameters,
+ File bootstrapConfigFile,
Review Comment:
this looks strange a little alone in this line
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java:
##########
@@ -35,64 +39,72 @@
public class ConfigurationChangeCoordinator implements Closeable,
ConfigurationChangeNotifier {
- public static final String NOTIFIER_PROPERTY_PREFIX =
"nifi.minifi.notifier";
- public static final String NOTIFIER_INGESTORS_KEY =
NOTIFIER_PROPERTY_PREFIX + ".ingestors";
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+ public static final String NOTIFIER_INGESTORS_KEY =
"nifi.minifi.notifier.ingestors";
- private final Set<ConfigurationChangeListener>
configurationChangeListeners;
- private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+ private static final String COMMA = ",";
private final BootstrapFileProvider bootstrapFileProvider;
private final RunMiNiFi runMiNiFi;
+ private final Set<ConfigurationChangeListener>
configurationChangeListeners;
+ private final Set<ChangeIngestor> changeIngestors;
public ConfigurationChangeCoordinator(BootstrapFileProvider
bootstrapFileProvider, RunMiNiFi runMiNiFi,
- Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
+ Set<ConfigurationChangeListener>
miNiFiConfigurationChangeListeners) {
this.bootstrapFileProvider = bootstrapFileProvider;
this.runMiNiFi = runMiNiFi;
- this.configurationChangeListeners =
Optional.ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+ this.configurationChangeListeners =
ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+ this.changeIngestors = new HashSet<>();
+ }
+
+ @Override
+ public Collection<ListenerHandleResult> notifyListeners(ByteBuffer
newFlowConfig) {
+ LOGGER.info("Notifying Listeners of a change");
+ return configurationChangeListeners.stream()
+ .map(listener -> notifyListener(newFlowConfig, listener))
+ .collect(toList());
+ }
+
+ @Override
+ public void close() {
+ closeIngestors();
}
/**
* Begins the associated notification service provided by the given
implementation. In most implementations, no action will occur until this
method is invoked.
*/
- public void start() throws IOException{
+ public void start() throws IOException {
initialize();
changeIngestors.forEach(ChangeIngestor::start);
}
- /**
- * Provides an immutable collection of listeners for the notifier instance
- *
- * @return a collection of those listeners registered for notifications
- */
- public Set<ConfigurationChangeListener> getChangeListeners() {
- return Collections.unmodifiableSet(configurationChangeListeners);
+ private ListenerHandleResult notifyListener(ByteBuffer newFlowConfig,
ConfigurationChangeListener listener) {
+ try {
+ listener.handleChange(new
ByteBufferInputStream(newFlowConfig.duplicate()));
+ ListenerHandleResult listenerHandleResult = new
ListenerHandleResult(listener);
+ LOGGER.info("Listener notification result {}",
listenerHandleResult);
+ return listenerHandleResult;
+ } catch (ConfigurationChangeException ex) {
+ ListenerHandleResult listenerHandleResult = new
ListenerHandleResult(listener, ex);
+ LOGGER.info("Listener notification result {} with failure {}",
listenerHandleResult, ex);
+ return listenerHandleResult;
+ }
}
- /**
- * Provide the mechanism by which listeners are notified
- */
- public Collection<ListenerHandleResult> notifyListeners(ByteBuffer
newConfig) {
- LOGGER.info("Notifying Listeners of a change");
+ private void initialize() throws IOException {
+ closeIngestors();
- Collection<ListenerHandleResult> listenerHandleResults = new
ArrayList<>(configurationChangeListeners.size());
- for (final ConfigurationChangeListener listener :
getChangeListeners()) {
- ListenerHandleResult result;
- try {
- listener.handleChange(new
ByteBufferInputStream(newConfig.duplicate()));
- result = new ListenerHandleResult(listener);
- } catch (ConfigurationChangeException ex) {
- result = new ListenerHandleResult(listener, ex);
- }
- listenerHandleResults.add(result);
- LOGGER.info("Listener notification result: {}", result);
- }
- return listenerHandleResults;
+ Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
+ ofNullable(bootstrapProperties.getProperty(NOTIFIER_INGESTORS_KEY))
+ .filter(not(String::isBlank))
+ .map(ingestors -> ingestors.split(COMMA))
+ .map(Stream::of)
Review Comment:
you can use .flatMap(Stream::of)
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java:
##########
@@ -29,59 +27,31 @@
public abstract class WholeConfigDifferentiator {
-
- private final static Logger logger =
LoggerFactory.getLogger(WholeConfigDifferentiator.class);
-
public static final String WHOLE_CONFIG_KEY = "Whole Config";
- volatile ConfigurationFileHolder configurationFileHolder;
-
- boolean compareInputStreamToConfigFile(InputStream inputStream) throws
IOException {
- logger.debug("Checking if change is different");
- AtomicReference<ByteBuffer> currentConfigFileReference =
configurationFileHolder.getConfigFileReference();
- ByteBuffer currentConfigFile = currentConfigFileReference.get();
- ByteBuffer byteBuffer = ByteBuffer.allocate(currentConfigFile.limit());
- DataInputStream dataInputStream = new DataInputStream(inputStream);
- try {
- dataInputStream.readFully(byteBuffer.array());
- } catch (EOFException e) {
- logger.debug("New config is shorter than the current. Must be
different.");
- return true;
- }
- logger.debug("Read the input");
+ private final static Logger logger =
LoggerFactory.getLogger(WholeConfigDifferentiator.class);
- if (dataInputStream.available() != 0) {
- return true;
- } else {
- return byteBuffer.compareTo(currentConfigFile) != 0;
- }
- }
+ protected volatile ConfigurationFileHolder configurationFileHolder;
public void initialize(ConfigurationFileHolder configurationFileHolder) {
this.configurationFileHolder = configurationFileHolder;
}
-
- public static class InputStreamInput extends WholeConfigDifferentiator
implements Differentiator<InputStream> {
- public boolean isNew(InputStream inputStream) throws IOException {
- return compareInputStreamToConfigFile(inputStream);
- }
- }
-
- public static class ByteBufferInput extends WholeConfigDifferentiator
implements Differentiator<ByteBuffer> {
- public boolean isNew(ByteBuffer inputBuffer) {
- AtomicReference<ByteBuffer> currentConfigFileReference =
configurationFileHolder.getConfigFileReference();
- ByteBuffer currentConfigFile = currentConfigFileReference.get();
- return inputBuffer.compareTo(currentConfigFile) != 0;
+ public static class ByteBufferInputDifferentiator extends
WholeConfigDifferentiator implements Differentiator<ByteBuffer> {
+ public boolean isNew(ByteBuffer newFlowConfig) {
+ AtomicReference<ByteBuffer> currentFlowConfigReference =
configurationFileHolder.getConfigFileReference();
+ ByteBuffer currentFlowConfig = currentFlowConfigReference.get();
+ logger.debug("Comparing byte buffers:\n newFlow={}\n
existingFlow={}", newFlowConfig, currentFlowConfig);
Review Comment:
this log line doesn't add any value in my opinion, so I would remove it:
Comparing byte buffers:
newFlow=java.nio.HeapByteBuffer[pos=0 lim=921 cap=921]
existingFlow=java.nio.HeapByteBuffer[pos=0 lim=921 cap=921]
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java:
##########
@@ -231,15 +215,21 @@ private void createSecureConnector(Properties properties)
{
logger.info("HTTPS Connector added for Host [{}] and Port [{}]",
https.getHost(), https.getPort());
}
- protected void setDifferentiator(Differentiator<ByteBuffer>
differentiator) {
+ private Supplier<IllegalArgumentException>
unableToFindDifferentiatorExceptionSupplier(String differentiator) {
Review Comment:
Even if it's possible I would rather minimalise further refactors in this PR.
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUpdateConfigurationStrategy implements
UpdateConfigurationStrategy {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
+
+ private final FlowController flowController;
+ private final FlowService flowService;
+ private final FlowEnrichService flowEnrichService;
+ private final Path flowConfigurationFile;
+ private final Path backupFlowConfigurationFile;
+ private final Path rawFlowConfigurationFile;
+ private final Path backupRawFlowConfigurationFile;
+
+ public DefaultUpdateConfigurationStrategy(FlowController flowController,
FlowService flowService, FlowEnrichService flowEnrichService, String
flowConfigurationFile) {
+ this.flowController = flowController;
+ this.flowService = flowService;
+ this.flowEnrichService = flowEnrichService;
+ Path flowConfigurationFilePath =
Path.of(flowConfigurationFile).toAbsolutePath();
+ this.flowConfigurationFile = flowConfigurationFilePath;
+ this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath +
BACKUP_EXTENSION);
+ String flowConfigurationFileBaseName =
FilenameUtils.getBaseName(flowConfigurationFilePath.toString());
+ this.rawFlowConfigurationFile =
flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName +
RAW_EXTENSION);
+ this.backupRawFlowConfigurationFile =
flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName +
BACKUP_EXTENSION + RAW_EXTENSION);
+ }
+
+ @Override
+ public boolean update(byte[] rawFlow) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Attempting to update flow with content: \n{}", new
String(rawFlow, UTF_8));
+ }
+ try {
+ byte[] enrichedFlowCandidate =
flowEnrichService.enrichFlow(rawFlow);
+ stopRootProcessGroup();
Review Comment:
@exceptionfactory, could you help me out here with the review? You are more
familiar with the NiFi core functionality, so I would like to double check with
you to see if this is the correct way to stop ongoing processing / reload and
restart the flow. Thank you in advance
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationStrategy.java:
##########
@@ -15,17 +15,10 @@
* limitations under the License.
*/
-package org.apache.nifi.minifi.c2.provider.nifi.rest;
+package org.apache.nifi.c2.client.service.operation;
-import java.io.IOException;
+@FunctionalInterface
+public interface UpdateConfigurationStrategy {
Review Comment:
Could yo add some class / method level javadoc about the intention of this
class?
##########
c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java:
##########
@@ -321,6 +343,21 @@ public Builder connectTimeout(long connectTimeout) {
return this;
}
+ public Builder httpHeaders(String httpHeaders) {
+ this.httpHeaders = ofNullable(httpHeaders)
+ .filter(StringUtils::isNotBlank)
+ .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+ .map(Arrays::stream)
+ .orElseGet(Stream::of)
Review Comment:
this can be simplified with .stream().flatMap(Arrays::stream)
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java:
##########
@@ -90,261 +89,233 @@ public class PullHttpChangeIngestor extends
AbstractPullChangeIngestor {
public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY +
".differentiator";
public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY +
".override.security";
+ public static final String HTTP_HEADERS = PULL_HTTP_BASE_KEY + ".headers";
+
+ private static final Logger logger =
LoggerFactory.getLogger(PullHttpChangeIngestor.class);
+
+ private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+ WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator
+ );
+ private static final int NOT_MODIFIED_STATUS_CODE = 304;
+ private static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
+ private static final String DEFAULT_READ_TIMEOUT_MS = "15000";
+ private static final String DOUBLE_QUOTES = "\"";
+ private static final String ETAG_HEADER = "ETag";
+ private static final String PROXY_AUTHORIZATION_HEADER =
"Proxy-Authorization";
+ private static final String DEFAULT_PATH = "/";
+ private static final int BAD_REQUEST_STATUS_CODE = 400;
+ private static final String IF_NONE_MATCH_HEADER_KEY = "If-None-Match";
+ private static final String HTTP_HEADERS_SEPARATOR = ",";
+ private static final String HTTP_HEADER_KEY_VALUE_SEPARATOR = ":";
private final AtomicReference<OkHttpClient> httpClientReference = new
AtomicReference<>();
private final AtomicReference<Integer> portReference = new
AtomicReference<>();
private final AtomicReference<String> hostReference = new
AtomicReference<>();
private final AtomicReference<String> pathReference = new
AtomicReference<>();
private final AtomicReference<String> queryReference = new
AtomicReference<>();
+ private final AtomicReference<Map<String, String>> httpHeadersReference =
new AtomicReference<>();
+
private volatile Differentiator<ByteBuffer> differentiator;
private volatile String connectionScheme;
private volatile String lastEtag = "";
private volatile boolean useEtag = false;
- public PullHttpChangeIngestor() {
- logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
- }
-
@Override
public void initialize(Properties properties, ConfigurationFileHolder
configurationFileHolder, ConfigurationChangeNotifier
configurationChangeNotifier) {
super.initialize(properties, configurationFileHolder,
configurationChangeNotifier);
-
pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY,
DEFAULT_POLLING_PERIOD)));
+
pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY,
DEFAULT_POLLING_PERIOD_MILLISECONDS)));
if (pollingPeriodMS.get() < 1) {
- throw new IllegalArgumentException("Property, " +
PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a
positive integer.");
- }
-
- final String host = properties.getProperty(HOST_KEY);
- if (host == null || host.isEmpty()) {
- throw new IllegalArgumentException("Property, " + HOST_KEY + ",
for the hostname to pull configurations from must be specified.");
- }
-
- final String path = properties.getProperty(PATH_KEY, "/");
- final String query = properties.getProperty(QUERY_KEY, "");
-
- final String portString = (String) properties.get(PORT_KEY);
- final Integer port;
- if (portString == null) {
- throw new IllegalArgumentException("Property, " + PORT_KEY + ",
for the hostname to pull configurations from must be specified.");
- } else {
- port = Integer.parseInt(portString);
+ throw new IllegalArgumentException("Property, " +
PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a
positive integer");
}
- portReference.set(port);
+ String host = ofNullable(properties.getProperty(HOST_KEY))
+ .filter(StringUtils::isNotBlank)
+ .orElseThrow(() -> new IllegalArgumentException("Property, " +
HOST_KEY + ", for the hostname to pull configurations from must be specified"));
+ String path = properties.getProperty(PATH_KEY, DEFAULT_PATH);
+ String query = properties.getProperty(QUERY_KEY, EMPTY);
+ Map<String, String> httpHeaders =
ofNullable(properties.getProperty(HTTP_HEADERS))
+ .filter(StringUtils::isNotBlank)
+ .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+ .map(Arrays::stream)
+ .orElseGet(Stream::of)
+ .map(String::trim)
+ .map(header -> header.split(HTTP_HEADER_KEY_VALUE_SEPARATOR))
+ .filter(split -> split.length == 2)
+ .collect(toMap(split ->
ofNullable(split[0]).map(String::trim).orElse(EMPTY), split ->
ofNullable(split[1]).map(String::trim).orElse(EMPTY)));
+ logger.debug("Configured HTTP headers: {}", httpHeaders);
+
+ ofNullable(properties.get(PORT_KEY))
+ .map(rawPort -> (String) rawPort)
Review Comment:
String.class::cast
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -121,10 +122,18 @@ private void start() throws IOException,
InterruptedException {
Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
- initConfigFiles(bootstrapProperties, confDir);
- Process process = startMiNiFi();
+ DEFAULT_LOGGER.debug("Generating minifi.properties from
bootstrap.conf");
+ initConfigFile(bootstrapProperties, confDir);
+ Path flowConfigFile =
Path.of(bootstrapProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath();
Review Comment:
Could you extract line 128-134 into a meaningful method? Also the System
Admin Guide needs to be updated with this new approach. It still contains the
yaml
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf:
##########
@@ -150,17 +107,11 @@ java.arg.14=-Djava.awt.headless=true
#c2.rest.connectionTimeout=5 sec
#c2.rest.readTimeout=5 sec
#c2.rest.callTimeout=10 sec
-## heartbeat in milliseconds
-#c2.agent.heartbeat.period=5000
-## define parameters about your agent
-#c2.agent.class=
+# Comma separated list of HTTP headers, eg: Accept:text/json
+#c2.rest.http.headers=text/json
Review Comment:
seems the value is incorrect here as it doesn't contain the "Accept:" part
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java:
##########
@@ -56,165 +58,154 @@
*/
public class FileChangeIngestor implements Runnable, ChangeIngestor {
- private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP;
-
- static {
- HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new
HashMap<>();
- tempMap.put(WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator);
+ private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+ WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator
+ );
- DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
- }
+ static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY +
".file";
+ static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY +
".config.path";
+ static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY +
".polling.period.seconds";
+ static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
+ private final static Logger logger =
LoggerFactory.getLogger(FileChangeIngestor.class);
- protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
- protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT =
TimeUnit.SECONDS;
+ private static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = SECONDS;
+ private static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY +
".differentiator";
- private final static Logger logger =
LoggerFactory.getLogger(FileChangeIngestor.class);
- private static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY
+ ".file";
+ private volatile Differentiator<ByteBuffer> differentiator;
+ private volatile ConfigurationChangeNotifier configurationChangeNotifier;
- protected static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY
+ ".config.path";
- protected static final String POLLING_PERIOD_INTERVAL_KEY =
CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
- public static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY +
".differentiator";
+ private ScheduledExecutorService executorService;
private Path configFilePath;
private WatchService watchService;
private long pollingSeconds;
- private volatile Differentiator<ByteBuffer> differentiator;
- private volatile ConfigurationChangeNotifier configurationChangeNotifier;
- private volatile ConfigurationFileHolder configurationFileHolder;
- private volatile Properties properties;
- private ScheduledExecutorService executorService;
- protected static WatchService initializeWatcher(Path filePath) {
+ @Override
+ public void initialize(Properties properties, ConfigurationFileHolder
configurationFileHolder, ConfigurationChangeNotifier
configurationChangeNotifier) {
+ Path configFile =
ofNullable(properties.getProperty(CONFIG_FILE_PATH_KEY))
+ .filter(not(String::isBlank))
+ .map(Path::of)
+ .map(Path::toAbsolutePath)
+ .orElseThrow(() -> new IllegalArgumentException("Property, " +
CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified"));
try {
- final WatchService fsWatcher =
FileSystems.getDefault().newWatchService();
- final Path watchDirectory = filePath.getParent();
- watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+ this.configurationChangeNotifier = configurationChangeNotifier;
+ this.configFilePath = configFile;
+ this.pollingSeconds =
ofNullable(properties.getProperty(POLLING_PERIOD_INTERVAL_KEY,
Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL)))
+ .map(Long::parseLong)
+ .filter(duration -> duration > 0)
+ .map(duration -> SECONDS.convert(duration,
DEFAULT_POLLING_PERIOD_UNIT))
+ .orElseThrow(() -> new IllegalArgumentException("Cannot
specify a polling period with duration <=0"));
+ this.watchService = initializeWatcher(configFile);
+ this.differentiator =
ofNullable(properties.getProperty(DIFFERENTIATOR_KEY))
+ .filter(not(String::isBlank))
+ .map(differentiator ->
ofNullable(DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiator))
+ .map(Supplier::get)
+
.orElseThrow(unableToFindDifferentiatorExceptionSupplier(differentiator)))
+
.orElseGet(WholeConfigDifferentiator::getByteBufferDifferentiator);
+ this.differentiator.initialize(configurationFileHolder);
+ } catch (Exception e) {
+ throw new IllegalStateException("Could not successfully initialize
file change notifier", e);
+ }
- return fsWatcher;
- } catch (IOException ioe) {
- throw new IllegalStateException("Unable to initialize a file
system watcher for the path " + filePath, ioe);
+ if
(Path.of(properties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath().equals(configFile))
{
Review Comment:
this should check the flow.json.raw if I'm not wrong
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java:
##########
@@ -35,64 +39,72 @@
public class ConfigurationChangeCoordinator implements Closeable,
ConfigurationChangeNotifier {
Review Comment:
nice refactor thanks!
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java:
##########
@@ -56,165 +58,154 @@
*/
public class FileChangeIngestor implements Runnable, ChangeIngestor {
- private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP;
-
- static {
- HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new
HashMap<>();
- tempMap.put(WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator);
+ private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+ WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator
+ );
- DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
- }
+ static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY +
".file";
+ static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY +
".config.path";
+ static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY +
".polling.period.seconds";
+ static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
+ private final static Logger logger =
LoggerFactory.getLogger(FileChangeIngestor.class);
- protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
- protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT =
TimeUnit.SECONDS;
+ private static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = SECONDS;
+ private static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY +
".differentiator";
- private final static Logger logger =
LoggerFactory.getLogger(FileChangeIngestor.class);
- private static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY
+ ".file";
+ private volatile Differentiator<ByteBuffer> differentiator;
+ private volatile ConfigurationChangeNotifier configurationChangeNotifier;
- protected static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY
+ ".config.path";
- protected static final String POLLING_PERIOD_INTERVAL_KEY =
CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
- public static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY +
".differentiator";
+ private ScheduledExecutorService executorService;
private Path configFilePath;
private WatchService watchService;
private long pollingSeconds;
- private volatile Differentiator<ByteBuffer> differentiator;
- private volatile ConfigurationChangeNotifier configurationChangeNotifier;
- private volatile ConfigurationFileHolder configurationFileHolder;
- private volatile Properties properties;
- private ScheduledExecutorService executorService;
- protected static WatchService initializeWatcher(Path filePath) {
+ @Override
+ public void initialize(Properties properties, ConfigurationFileHolder
configurationFileHolder, ConfigurationChangeNotifier
configurationChangeNotifier) {
+ Path configFile =
ofNullable(properties.getProperty(CONFIG_FILE_PATH_KEY))
+ .filter(not(String::isBlank))
+ .map(Path::of)
+ .map(Path::toAbsolutePath)
+ .orElseThrow(() -> new IllegalArgumentException("Property, " +
CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified"));
try {
- final WatchService fsWatcher =
FileSystems.getDefault().newWatchService();
- final Path watchDirectory = filePath.getParent();
- watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+ this.configurationChangeNotifier = configurationChangeNotifier;
+ this.configFilePath = configFile;
+ this.pollingSeconds =
ofNullable(properties.getProperty(POLLING_PERIOD_INTERVAL_KEY,
Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL)))
+ .map(Long::parseLong)
+ .filter(duration -> duration > 0)
+ .map(duration -> SECONDS.convert(duration,
DEFAULT_POLLING_PERIOD_UNIT))
+ .orElseThrow(() -> new IllegalArgumentException("Cannot
specify a polling period with duration <=0"));
+ this.watchService = initializeWatcher(configFile);
+ this.differentiator =
ofNullable(properties.getProperty(DIFFERENTIATOR_KEY))
+ .filter(not(String::isBlank))
+ .map(differentiator ->
ofNullable(DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiator))
+ .map(Supplier::get)
+
.orElseThrow(unableToFindDifferentiatorExceptionSupplier(differentiator)))
+
.orElseGet(WholeConfigDifferentiator::getByteBufferDifferentiator);
+ this.differentiator.initialize(configurationFileHolder);
+ } catch (Exception e) {
+ throw new IllegalStateException("Could not successfully initialize
file change notifier", e);
+ }
- return fsWatcher;
- } catch (IOException ioe) {
- throw new IllegalStateException("Unable to initialize a file
system watcher for the path " + filePath, ioe);
+ if
(Path.of(properties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath().equals(configFile))
{
+ throw new IllegalStateException("File ingestor config file (" +
CONFIG_FILE_PATH_KEY
+ + ") must point to a different file than MiNiFi flow config
file (" + MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey() + ")");
}
}
- protected boolean targetChanged() {
- boolean targetChanged;
+ @Override
+ public void start() {
+ executorService = Executors.newScheduledThreadPool(1, runnable -> {
+ Thread notifierThread =
Executors.defaultThreadFactory().newThread(runnable);
+ notifierThread.setName("File Change Notifier Thread");
+ notifierThread.setDaemon(true);
+ return notifierThread;
+ });
+ executorService.scheduleWithFixedDelay(this, 0, pollingSeconds,
DEFAULT_POLLING_PERIOD_UNIT);
+ }
+
+ @Override
+ public void run() {
+ logger.debug("Checking for a change in {}", configFilePath);
+ if (targetFileChanged()) {
+ logger.debug("Target file changed, checking if it's different than
current flow");
+ try (FileInputStream flowCandidateInputStream = new
FileInputStream(configFilePath.toFile())) {
+ ByteBuffer newFlowConfig =
wrap(toByteArray(flowCandidateInputStream));
+ if (differentiator.isNew(newFlowConfig)) {
+ logger.debug("Current flow and new flow is different,
notifying listener");
+ configurationChangeNotifier.notifyListeners(newFlowConfig);
+ logger.debug("Listeners have been notified");
+ }
+ } catch (Exception e) {
+ logger.error("Could not successfully notify listeners.", e);
+ }
+ } else {
+ logger.debug("No change detected in {}", configFilePath);
+ }
+ }
- Optional<WatchKey> watchKey = Optional.ofNullable(watchService.poll());
+ @Override
+ public void close() {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ }
- targetChanged = watchKey
+ boolean targetFileChanged() {
+ logger.debug("Attempting to acquire watch key");
Review Comment:
Are these watch key aquire logs important in debug level? I would move it to
trace, just tried it and the debug log is little bit noisy with it.
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java:
##########
@@ -14,115 +14,146 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.nifi.minifi.bootstrap.service;
+import static java.nio.ByteBuffer.wrap;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
-import static
org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
-import static
org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
-
-import java.io.File;
-import java.io.FileInputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.commons.io.IOUtils.toByteArray;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.io.IOUtils;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.eclipse.jetty.io.RuntimeIOException;
import org.slf4j.Logger;
public class MiNiFiConfigurationChangeListener implements
ConfigurationChangeListener {
+ private static final ReentrantLock handlingLock = new ReentrantLock();
+
private final RunMiNiFi runner;
private final Logger logger;
private final BootstrapFileProvider bootstrapFileProvider;
+ private final FlowEnrichService flowEnrichService;
- private static final ReentrantLock handlingLock = new ReentrantLock();
-
- public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger,
BootstrapFileProvider bootstrapFileProvider) {
+ public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger,
BootstrapFileProvider bootstrapFileProvider, FlowEnrichService
flowEnrichService) {
this.runner = runner;
this.logger = logger;
this.bootstrapFileProvider = bootstrapFileProvider;
+ this.flowEnrichService = flowEnrichService;
}
@Override
- public void handleChange(InputStream configInputStream) throws
ConfigurationChangeException {
+ public String getDescriptor() {
+ return "MiNiFiConfigurationChangeListener";
+ }
+
+ @Override
+ public void handleChange(InputStream flowConfigInputStream) throws
ConfigurationChangeException {
logger.info("Received notification of a change");
if (!handlingLock.tryLock()) {
throw new ConfigurationChangeException("Instance is already
handling another change");
}
- // Store the incoming stream as a byte array to be shared among
components that need it
+
+ Path currentFlowConfigFile = null;
+ Path backupFlowConfigFile = null;
+ Path currentRawFlowConfigFile = null;
+ Path backupRawFlowConfigFile = null;
try {
Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
- File configFile = new
File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
- File swapConfigFile = bootstrapFileProvider.getConfigYmlSwapFile();
- logger.info("Persisting old configuration to {}",
swapConfigFile.getAbsolutePath());
+ currentFlowConfigFile =
Path.of(bootstrapProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath();
+ backupFlowConfigFile = Path.of(currentFlowConfigFile +
BACKUP_EXTENSION);
+ String currentFlowConfigFileBaseName =
FilenameUtils.getBaseName(currentFlowConfigFile.toString());
+ currentRawFlowConfigFile =
currentFlowConfigFile.getParent().resolve(currentFlowConfigFileBaseName +
RAW_EXTENSION);
+ backupRawFlowConfigFile =
currentFlowConfigFile.getParent().resolve(currentFlowConfigFileBaseName +
RAW_EXTENSION + BACKUP_EXTENSION);
- try (FileInputStream configFileInputStream = new
FileInputStream(configFile)) {
- Files.copy(configFileInputStream, swapConfigFile.toPath(),
REPLACE_EXISTING);
- }
+ backup(currentFlowConfigFile, backupFlowConfigFile);
+ backup(currentRawFlowConfigFile, backupRawFlowConfigFile);
- // write out new config to file
- Files.copy(configInputStream, configFile.toPath(),
REPLACE_EXISTING);
-
- // Create an input stream to feed to the config transformer
- try (FileInputStream newConfigIs = new
FileInputStream(configFile)) {
- try {
- String confDir =
bootstrapProperties.getProperty(CONF_DIR_KEY);
- transformConfigurationFiles(confDir, newConfigIs,
configFile, swapConfigFile);
- } catch (Exception e) {
- logger.debug("Transformation of new config file failed
after swap file was created, deleting it.");
- if (!swapConfigFile.delete()) {
- logger.warn("The swap file failed to delete after a
failed handling of a change. It should be cleaned up manually.");
- }
- throw e;
- }
- }
+ byte[] rawFlow = toByteArray(flowConfigInputStream);
+ byte[] enrichedFlow = flowEnrichService.enrichFlow(rawFlow);
+ persist(enrichedFlow, currentFlowConfigFile, true);
+ restartInstance();
+ setActiveFlowReference(wrap(rawFlow));
+ persist(rawFlow, currentRawFlowConfigFile, false);
Review Comment:
If this throws exception the setActiveFlowReference won't be reverted if I
see correctly.
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java:
##########
@@ -90,261 +89,233 @@ public class PullHttpChangeIngestor extends
AbstractPullChangeIngestor {
public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY +
".differentiator";
public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY +
".override.security";
+ public static final String HTTP_HEADERS = PULL_HTTP_BASE_KEY + ".headers";
+
+ private static final Logger logger =
LoggerFactory.getLogger(PullHttpChangeIngestor.class);
+
+ private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+ WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator
+ );
+ private static final int NOT_MODIFIED_STATUS_CODE = 304;
+ private static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
+ private static final String DEFAULT_READ_TIMEOUT_MS = "15000";
+ private static final String DOUBLE_QUOTES = "\"";
+ private static final String ETAG_HEADER = "ETag";
+ private static final String PROXY_AUTHORIZATION_HEADER =
"Proxy-Authorization";
+ private static final String DEFAULT_PATH = "/";
+ private static final int BAD_REQUEST_STATUS_CODE = 400;
+ private static final String IF_NONE_MATCH_HEADER_KEY = "If-None-Match";
+ private static final String HTTP_HEADERS_SEPARATOR = ",";
+ private static final String HTTP_HEADER_KEY_VALUE_SEPARATOR = ":";
private final AtomicReference<OkHttpClient> httpClientReference = new
AtomicReference<>();
private final AtomicReference<Integer> portReference = new
AtomicReference<>();
private final AtomicReference<String> hostReference = new
AtomicReference<>();
private final AtomicReference<String> pathReference = new
AtomicReference<>();
private final AtomicReference<String> queryReference = new
AtomicReference<>();
+ private final AtomicReference<Map<String, String>> httpHeadersReference =
new AtomicReference<>();
+
private volatile Differentiator<ByteBuffer> differentiator;
private volatile String connectionScheme;
private volatile String lastEtag = "";
private volatile boolean useEtag = false;
- public PullHttpChangeIngestor() {
- logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
- }
-
@Override
public void initialize(Properties properties, ConfigurationFileHolder
configurationFileHolder, ConfigurationChangeNotifier
configurationChangeNotifier) {
super.initialize(properties, configurationFileHolder,
configurationChangeNotifier);
-
pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY,
DEFAULT_POLLING_PERIOD)));
+
pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY,
DEFAULT_POLLING_PERIOD_MILLISECONDS)));
if (pollingPeriodMS.get() < 1) {
- throw new IllegalArgumentException("Property, " +
PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a
positive integer.");
- }
-
- final String host = properties.getProperty(HOST_KEY);
- if (host == null || host.isEmpty()) {
- throw new IllegalArgumentException("Property, " + HOST_KEY + ",
for the hostname to pull configurations from must be specified.");
- }
-
- final String path = properties.getProperty(PATH_KEY, "/");
- final String query = properties.getProperty(QUERY_KEY, "");
-
- final String portString = (String) properties.get(PORT_KEY);
- final Integer port;
- if (portString == null) {
- throw new IllegalArgumentException("Property, " + PORT_KEY + ",
for the hostname to pull configurations from must be specified.");
- } else {
- port = Integer.parseInt(portString);
+ throw new IllegalArgumentException("Property, " +
PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a
positive integer");
}
- portReference.set(port);
+ String host = ofNullable(properties.getProperty(HOST_KEY))
+ .filter(StringUtils::isNotBlank)
+ .orElseThrow(() -> new IllegalArgumentException("Property, " +
HOST_KEY + ", for the hostname to pull configurations from must be specified"));
+ String path = properties.getProperty(PATH_KEY, DEFAULT_PATH);
+ String query = properties.getProperty(QUERY_KEY, EMPTY);
+ Map<String, String> httpHeaders =
ofNullable(properties.getProperty(HTTP_HEADERS))
+ .filter(StringUtils::isNotBlank)
+ .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+ .map(Arrays::stream)
Review Comment:
.stream().flatMap(Arrays::stream)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]