http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JacksonResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JacksonResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JacksonResponse.java new file mode 100644 index 0000000..e4eb45c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JacksonResponse.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication.okhttp; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.lang.annotation.Annotation; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import javax.ws.rs.core.EntityTag; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.Link; +import javax.ws.rs.core.Link.Builder; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.NewCookie; +import javax.ws.rs.core.Response; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class JacksonResponse extends Response { + private final ObjectMapper codec; + private final byte[] responseBody; + private final MultivaluedMap<String, String> responseHeaders; + private final URI location; + private final int statusCode; + private final Runnable closeCallback; + + private final JsonFactory jsonFactory = new JsonFactory(); + + public JacksonResponse(final ObjectMapper codec, final byte[] responseBody, final MultivaluedMap<String, String> responseHeaders, final URI location, final int statusCode, + final Runnable closeCallback) { + this.codec = codec; + this.responseBody = responseBody; + this.responseHeaders = responseHeaders; + this.location = location; + this.statusCode = statusCode; + this.closeCallback = closeCallback; + } + + @Override + public int getStatus() { + return statusCode; + } + + @Override + public StatusType getStatusInfo() { + return Status.fromStatusCode(getStatus()); + } + + @Override + public Object getEntity() { + try { + final JsonParser parser = jsonFactory.createParser(responseBody); + parser.setCodec(codec); + return parser.readValueAs(Object.class); + } catch (final Exception e) { + throw new RuntimeException("Failed to parse response", e); + } + } + + @Override + @SuppressWarnings("unchecked") + public <T> T readEntity(Class<T> entityType) { + if (InputStream.class.equals(entityType)) { + return (T) new ByteArrayInputStream(responseBody); + } + + if (String.class.equals(entityType)) { + return (T) new String(responseBody, StandardCharsets.UTF_8); + } + + try { + final JsonParser parser = jsonFactory.createParser(responseBody); + parser.setCodec(codec); + return parser.readValueAs(entityType); + } catch (final Exception e) { + throw new RuntimeException("Failed to parse response as entity of type " + entityType, e); + } + } + + @Override + public <T> T readEntity(GenericType<T> entityType) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> T readEntity(Class<T> entityType, Annotation[] annotations) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> T readEntity(GenericType<T> entityType, Annotation[] annotations) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasEntity() { + return responseBody != null && responseBody.length > 0; + } + + @Override + public boolean bufferEntity() { + return true; + } + + @Override + public void close() { + if (closeCallback != null) { + closeCallback.run(); + } + } + + @Override + public MediaType getMediaType() { + return MediaType.APPLICATION_JSON_TYPE; + } + + @Override + public Locale getLanguage() { + return null; + } + + @Override + public int getLength() { + return responseBody == null ? 0 : responseBody.length; + } + + @Override + public Set<String> getAllowedMethods() { + final String allowHeader = getHeaderString("Allow"); + if (allowHeader == null || allowHeader.trim().isEmpty()) { + return Collections.emptySet(); + } + + final Set<String> allowed = new HashSet<>(); + for (final String allow : allowHeader.split(",")) { + final String trimmed = allow.trim().toUpperCase(); + if (!trimmed.isEmpty()) { + allowed.add(trimmed); + } + } + + return allowed; + } + + @Override + public Map<String, NewCookie> getCookies() { + return Collections.emptyMap(); + } + + @Override + public EntityTag getEntityTag() { + return null; + } + + @Override + public Date getDate() { + return null; + } + + @Override + public Date getLastModified() { + return null; + } + + @Override + public URI getLocation() { + return location; + } + + @Override + public Set<Link> getLinks() { + return Collections.emptySet(); + } + + @Override + public boolean hasLink(String relation) { + return false; + } + + @Override + public Link getLink(String relation) { + return null; + } + + @Override + public Builder getLinkBuilder(String relation) { + return null; + } + + @Override + public MultivaluedMap<String, Object> getMetadata() { + return new MultivaluedHashMap<>(); + } + + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public MultivaluedMap<String, Object> getHeaders() { + return (MultivaluedMap) responseHeaders; + } + + @Override + public MultivaluedMap<String, String> getStringHeaders() { + return responseHeaders; + } + + @Override + public String getHeaderString(String name) { + return responseHeaders.getFirst(name); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JsonEntitySerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JsonEntitySerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JsonEntitySerializer.java new file mode 100644 index 0000000..d652291 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JsonEntitySerializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication.okhttp; + +import java.io.IOException; +import java.io.OutputStream; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class JsonEntitySerializer implements EntitySerializer { + private final ObjectMapper jsonCodec; + + public JsonEntitySerializer(final ObjectMapper jsonCodec) { + this.jsonCodec = jsonCodec; + } + + @Override + public void serialize(final Object entity, final OutputStream out) throws IOException { + final JsonFactory factory = new JsonFactory(); + final JsonGenerator generator = factory.createGenerator(out); + generator.setCodec(jsonCodec); + generator.writeObject(entity); + generator.flush(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpPreparedRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpPreparedRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpPreparedRequest.java new file mode 100644 index 0000000..9b9a699 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpPreparedRequest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication.okhttp; + +import java.util.Map; + +import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest; + +import okhttp3.RequestBody; + +public class OkHttpPreparedRequest implements PreparedRequest { + private final String method; + private final Map<String, String> headers; + private final Object entity; + private final RequestBody requestBody; + + public OkHttpPreparedRequest(final String method, final Map<String, String> headers, final Object entity, final RequestBody requestBody) { + this.method = method; + this.headers = headers; + this.entity = entity; + this.requestBody = requestBody; + } + + @Override + public String getMethod() { + return method; + } + + @Override + public Map<String, String> getHeaders() { + return headers; + } + + @Override + public Object getEntity() { + return entity; + } + + public RequestBody getRequestBody() { + return requestBody; + } + + @Override + public String toString() { + return "OkHttpPreparedRequest[method=" + method + ", headers=" + headers + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java new file mode 100644 index 0000000..3df44ec --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication.okhttp; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.security.KeyStore; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.zip.GZIPInputStream; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509TrustManager; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient; +import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest; +import org.apache.nifi.framework.security.util.SslContextFactory; +import org.apache.nifi.remote.protocol.http.HttpHeaders; +import org.apache.nifi.stream.io.GZIPOutputStream; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StreamUtils; + +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonInclude.Value; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; + +import okhttp3.Call; +import okhttp3.Headers; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; + +public class OkHttpReplicationClient implements HttpReplicationClient { + private static final Logger logger = LoggerFactory.getLogger(OkHttpReplicationClient.class); + private static final Set<String> gzipEncodings = Stream.of("gzip", "x-gzip").collect(Collectors.toSet()); + + private final EntitySerializer jsonSerializer; + private final EntitySerializer xmlSerializer; + + private final ObjectMapper jsonCodec = new ObjectMapper(); + private final OkHttpClient okHttpClient; + + public OkHttpReplicationClient(final NiFiProperties properties, final HostnameVerifier hostnameVerifier) { + jsonCodec.setDefaultPropertyInclusion(Value.construct(Include.NON_NULL, Include.ALWAYS)); + jsonCodec.setAnnotationIntrospector(new JaxbAnnotationIntrospector(jsonCodec.getTypeFactory())); + + jsonSerializer = new JsonEntitySerializer(jsonCodec); + xmlSerializer = new XmlEntitySerializer(); + + okHttpClient = createOkHttpClient(properties, hostnameVerifier); + } + + @Override + public PreparedRequest prepareRequest(final String method, final Map<String, String> headers, final Object entity) { + final boolean gzip = isUseGzip(headers); + final RequestBody requestBody = createRequestBody(headers, entity, gzip); + + final Map<String, String> updatedHeaders = gzip ? updateHeadersForGzip(headers) : headers; + return new OkHttpPreparedRequest(method, updatedHeaders, entity, requestBody); + } + + @Override + public Response replicate(final PreparedRequest request, final String uri) throws IOException { + if (!(Objects.requireNonNull(request) instanceof OkHttpPreparedRequest)) { + throw new IllegalArgumentException("Replication Client is only able to replicate requests that the client itself has prepared"); + } + + return replicate((OkHttpPreparedRequest) request, uri); + } + + private Response replicate(final OkHttpPreparedRequest request, final String uri) throws IOException { + logger.debug("Replicating request {} to {}", request, uri); + final Call call = createCall(request, uri); + final okhttp3.Response callResponse = call.execute(); + + final byte[] responseBytes = getResponseBytes(callResponse); + final MultivaluedMap<String, String> responseHeaders = getHeaders(callResponse); + logger.debug("Received response code {} with headers {} for request {} to {}", callResponse.code(), responseHeaders, request, uri); + + final Response response = new JacksonResponse(jsonCodec, responseBytes, responseHeaders, URI.create(uri), callResponse.code(), callResponse::close); + return response; + } + + private MultivaluedMap<String, String> getHeaders(final okhttp3.Response callResponse) { + final Headers headers = callResponse.headers(); + final MultivaluedMap<String, String> headerMap = new MultivaluedHashMap<>(); + for (final String name : headers.names()) { + final List<String> values = headers.values(name); + headerMap.addAll(name, values); + } + + return headerMap; + } + + private byte[] getResponseBytes(final okhttp3.Response callResponse) throws IOException { + final byte[] rawBytes = callResponse.body().bytes(); + + final String contentEncoding = callResponse.header("Content-Encoding"); + if (gzipEncodings.contains(contentEncoding)) { + try (final InputStream gzipIn = new GZIPInputStream(new ByteArrayInputStream(rawBytes)); + final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + + StreamUtils.copy(gzipIn, baos); + return baos.toByteArray(); + } + } else { + return rawBytes; + } + } + + private Call createCall(final OkHttpPreparedRequest request, final String uri) { + Request.Builder requestBuilder = new Request.Builder(); + + final HttpUrl url = buildUrl(request, uri); + requestBuilder = requestBuilder.url(url); + + // build the request body + final String method = request.getMethod().toUpperCase(); + switch (method) { + case "POST": + case "PUT": + case "PATCH": + requestBuilder = requestBuilder.method(method, request.getRequestBody()); + break; + default: + requestBuilder = requestBuilder.method(method, null); + break; + } + + // Add appropriate headers + for (final Map.Entry<String, String> header : request.getHeaders().entrySet()) { + requestBuilder = requestBuilder.addHeader(header.getKey(), header.getValue()); + } + + // Build the request + final Request okHttpRequest = requestBuilder.build(); + final Call call = okHttpClient.newCall(okHttpRequest); + return call; + } + + + @SuppressWarnings("unchecked") + private HttpUrl buildUrl(final OkHttpPreparedRequest request, final String uri) { + HttpUrl.Builder urlBuilder = HttpUrl.parse(uri.toString()).newBuilder(); + switch (request.getMethod().toUpperCase()) { + case HttpMethod.DELETE: + case HttpMethod.HEAD: + case HttpMethod.GET: + case HttpMethod.OPTIONS: + if (request.getEntity() instanceof MultivaluedMap) { + final MultivaluedMap<String, String> entityMap = (MultivaluedMap<String, String>) request.getEntity(); + + for (final Entry<String, List<String>> queryEntry : entityMap.entrySet()) { + final String queryName = queryEntry.getKey(); + for (final String queryValue : queryEntry.getValue()) { + urlBuilder = urlBuilder.addQueryParameter(queryName, queryValue); + } + } + } + + break; + } + + return urlBuilder.build(); + } + + private RequestBody createRequestBody(final Map<String, String> headers, final Object entity, final boolean gzip) { + final String contentType = getContentType(headers, "application/json"); + final byte[] serialized = serializeEntity(entity, contentType, gzip); + + final MediaType mediaType = MediaType.parse(contentType); + return RequestBody.create(mediaType, serialized); + } + + private String getContentType(final Map<String, String> headers, final String defaultValue) { + for (final Map.Entry<String, String> entry : headers.entrySet()) { + if (entry.getKey().equalsIgnoreCase("content-type")) { + return entry.getValue(); + } + } + + return defaultValue; + } + + private byte[] serializeEntity(final Object entity, final String contentType, final boolean gzip) { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final OutputStream out = gzip ? new GZIPOutputStream(baos, 1) : baos) { + + getSerializer(contentType).serialize(entity, out); + out.close(); + + return baos.toByteArray(); + } catch (final IOException e) { + // This should never happen with a ByteArrayOutputStream + throw new RuntimeException("Failed to serialize entity for cluster replication", e); + } + } + + private EntitySerializer getSerializer(final String contentType) { + switch (contentType.toLowerCase()) { + case "application/xml": + return xmlSerializer; + case "application/json": + default: + return jsonSerializer; + } + } + + + private Map<String, String> updateHeadersForGzip(final Map<String, String> headers) { + final String encodingHeader = headers.get("Content-Encoding"); + if (gzipEncodings.contains(encodingHeader)) { + return headers; + } + + final Map<String, String> updatedHeaders = new HashMap<>(headers); + updatedHeaders.put("Content-Encoding", "gzip"); + return updatedHeaders; + } + + + private boolean isUseGzip(final Map<String, String> headers) { + final String rawAcceptEncoding = headers.get(HttpHeaders.ACCEPT_ENCODING); + + if (rawAcceptEncoding == null) { + return false; + } else { + final String[] acceptEncodingTokens = rawAcceptEncoding.split(","); + return Stream.of(acceptEncodingTokens) + .map(String::trim) + .filter(enc -> StringUtils.isNotEmpty(enc)) + .map(String::toLowerCase) + .anyMatch(gzipEncodings::contains); + } + } + + private OkHttpClient createOkHttpClient(final NiFiProperties properties, final HostnameVerifier hostnameVerifier) { + final String connectionTimeout = properties.getClusterNodeConnectionTimeout(); + final long connectionTimeoutMs = FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS); + final String readTimeout = properties.getClusterNodeReadTimeout(); + final long readTimeoutMs = FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS); + + OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder(); + okHttpClientBuilder.connectTimeout(connectionTimeoutMs, TimeUnit.MILLISECONDS); + okHttpClientBuilder.readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS); + okHttpClientBuilder.followRedirects(true); + + final Tuple<SSLSocketFactory, X509TrustManager> tuple = createSslSocketFactory(properties); + if (tuple != null) { + okHttpClientBuilder.sslSocketFactory(tuple.getKey(), tuple.getValue()); + } + + if (hostnameVerifier != null) { + okHttpClientBuilder.hostnameVerifier(hostnameVerifier); + } + + return okHttpClientBuilder.build(); + } + + private Tuple<SSLSocketFactory, X509TrustManager> createSslSocketFactory(final NiFiProperties properties) { + final SSLContext sslContext = SslContextFactory.createSslContext(properties); + + if (sslContext == null) { + return null; + } + + try { + final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509"); + + // initialize the KeyManager array to null and we will overwrite later if a keystore is loaded + KeyManager[] keyManagers = null; + + // we will only initialize the keystore if properties have been supplied by the SSLContextService + final String keystoreLocation = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE); + final String keystorePass = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD); + final String keystoreType = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE); + + // prepare the keystore + final KeyStore keyStore = KeyStore.getInstance(keystoreType); + + try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) { + keyStore.load(keyStoreStream, keystorePass.toCharArray()); + } + + keyManagerFactory.init(keyStore, keystorePass.toCharArray()); + keyManagers = keyManagerFactory.getKeyManagers(); + + // we will only initialize the truststure if properties have been supplied by the SSLContextService + // load truststore + final String truststoreLocation = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE); + final String truststorePass = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD); + final String truststoreType = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE); + + KeyStore truststore = KeyStore.getInstance(truststoreType); + truststore.load(new FileInputStream(truststoreLocation), truststorePass.toCharArray()); + trustManagerFactory.init(truststore); + + // TrustManagerFactory.getTrustManagers returns a trust manager for each type of trust material. Since we are getting a trust manager factory that uses "X509" + // as it's trust management algorithm, we are able to grab the first (and thus the most preferred) and use it as our x509 Trust Manager + // + // https://docs.oracle.com/javase/8/docs/api/javax/net/ssl/TrustManagerFactory.html#getTrustManagers-- + final X509TrustManager x509TrustManager; + TrustManager[] trustManagers = trustManagerFactory.getTrustManagers(); + if (trustManagers[0] != null) { + x509TrustManager = (X509TrustManager) trustManagers[0]; + } else { + throw new IllegalStateException("List of trust managers is null"); + } + + // if keystore properties were not supplied, the keyManagers array will be null + sslContext.init(keyManagers, trustManagerFactory.getTrustManagers(), null); + + final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); + return new Tuple<>(sslSocketFactory, x509TrustManager); + } catch (final Exception e) { + throw new RuntimeException("Failed to create SSL Socket Factory for replicating requests across the cluster"); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/XmlEntitySerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/XmlEntitySerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/XmlEntitySerializer.java new file mode 100644 index 0000000..bc7fdc4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/XmlEntitySerializer.java @@ -0,0 +1,60 @@ +/* +` * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication.okhttp; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; + +public class XmlEntitySerializer implements EntitySerializer { + private final ConcurrentMap<Class<?>, JAXBContext> jaxbContextCache = new ConcurrentHashMap<>(); + + @Override + public void serialize(final Object entity, final OutputStream out) throws IOException { + try { + final Marshaller marshaller = getJaxbContext(entity.getClass()).createMarshaller(); + marshaller.marshal(entity, out); + } catch (final JAXBException e) { + throw new IOException(e); + } + } + + private JAXBContext getJaxbContext(final Class<?> entityType) { + JAXBContext context = jaxbContextCache.get(entityType); + if (context != null) { + return context; + } + + context = createJaxbContext(entityType); + jaxbContextCache.putIfAbsent(entityType, context); + return context; + } + + private JAXBContext createJaxbContext(final Class<?> entityType) { + try { + return JAXBContext.newInstance(entityType); + } catch (final JAXBException e) { + throw new RuntimeException("Failed to create JAXBContext for Entity Type [" + entityType + "] so could not parse incoming request", e); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java index fd9a387..f14b696 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java @@ -16,6 +16,12 @@ */ package org.apache.nifi.cluster.manager; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.api.dto.ControllerServiceDTO; @@ -25,11 +31,6 @@ import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - public class ControllerServiceEntityMerger implements ComponentEntityMerger<ControllerServiceEntity> { /** @@ -112,6 +113,12 @@ public class ControllerServiceEntityMerger implements ComponentEntityMerger<Cont clientDto.setState(state); } + final Set<String> statuses = dtoMap.values().stream() + .map(ControllerServiceDTO::getValidationStatus) + .collect(Collectors.toSet()); + + clientDto.setValidationStatus(ErrorMerger.mergeValidationStatus(statuses)); + // set the merged the validation errors clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size())); } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java index 1b51e16..9eec1d8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java @@ -16,13 +16,14 @@ */ package org.apache.nifi.cluster.manager; -import org.apache.nifi.cluster.protocol.NodeIdentifier; - import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ProcessorDTO; + public final class ErrorMerger { private ErrorMerger() {} @@ -37,7 +38,7 @@ public final class ErrorMerger { public static void mergeErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeErrors) { if (nodeErrors != null) { nodeErrors.stream().forEach( - err -> validationErrorMap.computeIfAbsent(err, k -> new HashSet<NodeIdentifier>()) + err -> validationErrorMap.computeIfAbsent(err, k -> new HashSet<>()) .add(nodeId)); } } @@ -63,4 +64,28 @@ public final class ErrorMerger { } return normalizedErrors; } + + /** + * Determines the appropriate Validation Status to use as the aggregate for the given validation statuses + * + * @param validationStatuses the components' validation statuses + * @return {@link ProcessorDTO#INVALID} if any status is invalid, else {@link ProcessorDTO#VALIDATING} if any status is validating, else {@link ProcessorDTO#VALID} + */ + public static <T> String mergeValidationStatus(final Collection<String> validationStatuses) { + final boolean anyInvalid = validationStatuses.stream() + .anyMatch(status -> ProcessorDTO.INVALID.equalsIgnoreCase(status)); + + if (anyInvalid) { + return ProcessorDTO.INVALID; + } + + final boolean anyValidating = validationStatuses.stream() + .anyMatch(status -> ProcessorDTO.VALIDATING.equalsIgnoreCase(status)); + + if (anyValidating) { + return ProcessorDTO.VALIDATING; + } + + return ProcessorDTO.VALID; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java index dffac49..55dce23 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class ProcessorEntityMerger implements ComponentEntityMerger<ProcessorEntity>, ComponentEntityStatusMerger<ProcessorStatusDTO> { @Override @@ -45,6 +46,7 @@ public class ProcessorEntityMerger implements ComponentEntityMerger<ProcessorEnt * @param clientEntity the entity being returned to the client * @param entityMap all node responses */ + @Override public void mergeComponents(final ProcessorEntity clientEntity, final Map<NodeIdentifier, ProcessorEntity> entityMap) { final ProcessorDTO clientDto = clientEntity.getComponent(); final Map<NodeIdentifier, ProcessorDTO> dtoMap = new HashMap<>(); @@ -106,6 +108,11 @@ public class ProcessorEntityMerger implements ComponentEntityMerger<ProcessorEnt } } + final Set<String> statuses = dtoMap.values().stream() + .map(ProcessorDTO::getValidationStatus) + .collect(Collectors.toSet()); + clientDto.setValidationStatus(ErrorMerger.mergeValidationStatus(statuses)); + // set the merged the validation errors clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size())); } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java index e6bfba3..b439eaa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class ReportingTaskEntityMerger implements ComponentEntityMerger<ReportingTaskEntity> { @@ -34,6 +35,7 @@ public class ReportingTaskEntityMerger implements ComponentEntityMerger<Reportin * @param clientEntity the entity being returned to the client * @param entityMap all node responses */ + @Override public void mergeComponents(final ReportingTaskEntity clientEntity, final Map<NodeIdentifier, ReportingTaskEntity> entityMap) { final ReportingTaskDTO clientDto = clientEntity.getComponent(); final Map<NodeIdentifier, ReportingTaskDTO> dtoMap = new HashMap<>(); @@ -91,6 +93,11 @@ public class ReportingTaskEntityMerger implements ComponentEntityMerger<Reportin // set the merged active thread counts clientDto.setActiveThreadCount(activeThreadCount); + final Set<String> validationStatuses = dtoMap.values().stream() + .map(ReportingTaskDTO::getValidationStatus) + .collect(Collectors.toSet()); + clientDto.setValidationStatus(ErrorMerger.mergeValidationStatus(validationStatuses)); + // set the merged the validation errors clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size())); } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java index 90b05aa..e0477a7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java @@ -20,17 +20,15 @@ package org.apache.nifi.cluster.spring; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback; import org.apache.nifi.cluster.coordination.http.replication.ThreadPoolRequestReplicator; +import org.apache.nifi.cluster.coordination.http.replication.okhttp.OkHttpReplicationClient; import org.apache.nifi.events.EventReporter; -import org.apache.nifi.framework.security.util.SslContextFactory; import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.util.WebUtils; +import org.apache.nifi.web.util.NiFiHostnameVerifier; import org.springframework.beans.BeansException; import org.springframework.beans.factory.FactoryBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; -import javax.ws.rs.client.Client; - public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean<ThreadPoolRequestReplicator>, ApplicationContextAware { private ApplicationContext applicationContext; private NiFiProperties nifiProperties; @@ -47,12 +45,11 @@ public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean<Threa final int corePoolSize = nifiProperties.getClusterNodeProtocolCorePoolSize(); final int maxPoolSize = nifiProperties.getClusterNodeProtocolMaxPoolSize(); final int maxConcurrentRequests = nifiProperties.getClusterNodeMaxConcurrentRequests(); - final Client jerseyClient = WebUtils.createClient(null, SslContextFactory.createSslContext(nifiProperties)); - final String connectionTimeout = nifiProperties.getClusterNodeConnectionTimeout(); - final String readTimeout = nifiProperties.getClusterNodeReadTimeout(); - replicator = new ThreadPoolRequestReplicator(corePoolSize, maxPoolSize, maxConcurrentRequests, jerseyClient, clusterCoordinator, - connectionTimeout, readTimeout, requestCompletionCallback, eventReporter, nifiProperties); + final OkHttpReplicationClient replicationClient = new OkHttpReplicationClient(nifiProperties, new NiFiHostnameVerifier()); + + replicator = new ThreadPoolRequestReplicator(corePoolSize, maxPoolSize, maxConcurrentRequests, replicationClient, clusterCoordinator, + requestCompletionCallback, eventReporter, nifiProperties); } return replicator; http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 1f0ceb5..15b7774 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -16,13 +16,40 @@ */ package org.apache.nifi.cluster.coordination.http.replication; -import org.apache.commons.collections4.map.MultiValueMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.ProcessingException; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserDetails; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.authorization.user.StandardNiFiUser; import org.apache.nifi.authorization.user.StandardNiFiUser.Builder; import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.http.replication.util.MockReplicationClient; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.manager.NodeResponse; @@ -30,49 +57,21 @@ import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestExc import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.security.ProxiedEntitiesUtils; import org.apache.nifi.web.security.token.NiFiAuthenticationToken; -import org.glassfish.jersey.client.ClientRequest; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.internal.util.reflection.Whitebox; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.ProcessingException; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Invocation; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import java.net.SocketTimeoutException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class TestThreadPoolRequestReplicator { @BeforeClass @@ -234,13 +233,17 @@ public class TestThreadPoolRequestReplicator { final AtomicInteger requestCount = new AtomicInteger(0); final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, ClientBuilder.newClient(), coordinator, "1 sec", "1 sec", null, null, props) { + + final MockReplicationClient client = new MockReplicationClient(); + final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> { + }; + + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) { @Override - protected NodeResponse replicateRequest(final Invocation invocation, final NodeIdentifier nodeId, final String method, - final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) { + protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId, + final URI uri, final String requestId, final StandardAsyncClusterResponse response) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. - final ClientRequest requestContext = (ClientRequest) Whitebox.getInternalState(invocation, "requestContext"); - final Object expectsHeader = requestContext.getHeaders().getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); + final Object expectsHeader = request.getHeaders().get(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); final int statusCode; if (requestCount.incrementAndGet() == 1) { @@ -254,7 +257,7 @@ public class TestThreadPoolRequestReplicator { // Return given response from all nodes. final Response clientResponse = mock(Response.class); when(clientResponse.getStatus()).thenReturn(statusCode); - return new NodeResponse(nodeId, method, uri, clientResponse, -1L, requestId); + return new NodeResponse(nodeId, request.getMethod(), uri, clientResponse, -1L, requestId); } }; @@ -307,7 +310,12 @@ public class TestThreadPoolRequestReplicator { when(coordinator.getConnectionStates()).thenReturn(nodeMap); final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, ClientBuilder.newClient(), coordinator, "1 sec", "1 sec", null, null, props) { + + final MockReplicationClient client = new MockReplicationClient(); + final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> { + }; + + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) { @Override public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, boolean verify) { @@ -346,7 +354,7 @@ public class TestThreadPoolRequestReplicator { } // should not throw an Exception because it's a GET - replicator.replicate(HttpMethod.GET, new URI("http://localhost:80/processors/1"), new MultiValueMap<>(), new HashMap<>()); + replicator.replicate(HttpMethod.GET, new URI("http://localhost:80/processors/1"), new MultivaluedHashMap<>(), new HashMap<>()); // should not throw an Exception because all nodes are now connected nodeMap.remove(NodeConnectionState.DISCONNECTING); @@ -365,13 +373,17 @@ public class TestThreadPoolRequestReplicator { final ClusterCoordinator coordinator = createClusterCoordinator(); final AtomicInteger requestCount = new AtomicInteger(0); final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, ClientBuilder.newClient(), coordinator, "1 sec", "1 sec", null, null, props) { + + final MockReplicationClient client = new MockReplicationClient(); + final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> { + }; + + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) { @Override - protected NodeResponse replicateRequest(final Invocation invocation, final NodeIdentifier nodeId, final String method, - final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) { + protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId, + final URI uri, final String requestId, final StandardAsyncClusterResponse response) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. - final ClientRequest requestContext = (ClientRequest) Whitebox.getInternalState(invocation, "requestContext"); - final Object expectsHeader = requestContext.getHeaders().getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); + final Object expectsHeader = request.getHeaders().get(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); final int requestIndex = requestCount.incrementAndGet(); assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader); @@ -379,10 +391,10 @@ public class TestThreadPoolRequestReplicator { if (requestIndex == 1) { final Response clientResponse = mock(Response.class); when(clientResponse.getStatus()).thenReturn(150); - return new NodeResponse(nodeId, method, uri, clientResponse, -1L, requestId); + return new NodeResponse(nodeId, request.getMethod(), uri, clientResponse, -1L, requestId); } else { final IllegalClusterStateException explanation = new IllegalClusterStateException("Intentional Exception for Unit Testing"); - return new NodeResponse(nodeId, method, uri, explanation); + return new NodeResponse(nodeId, request.getMethod(), uri, explanation); } } }; @@ -576,10 +588,15 @@ public class TestThreadPoolRequestReplicator { private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure, final String expectedRequestChain) { final ClusterCoordinator coordinator = createClusterCoordinator(); final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, ClientBuilder.newClient(), coordinator, "1 sec", "1 sec", null, null, nifiProps) { + final MockReplicationClient client = new MockReplicationClient(); + final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> { + }; + + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, nifiProps) { @Override - protected NodeResponse replicateRequest(final Invocation invocation, final NodeIdentifier nodeId, final String method, - final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) { + protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId, final URI uri, final String requestId, + final StandardAsyncClusterResponse response) { + if (delayMillis > 0L) { try { Thread.sleep(delayMillis); @@ -592,8 +609,7 @@ public class TestThreadPoolRequestReplicator { throw failure; } - final ClientRequest requestContext = (ClientRequest) Whitebox.getInternalState(invocation, "requestContext"); - final Object proxiedEntities = requestContext.getHeaders().getFirst(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN); + final Object proxiedEntities = request.getHeaders().get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN); // ensure the request chain is in the request Assert.assertEquals(expectedRequestChain, proxiedEntities); @@ -601,7 +617,7 @@ public class TestThreadPoolRequestReplicator { // Return given response from all nodes. final Response clientResponse = mock(Response.class); when(clientResponse.getStatus()).thenReturn(status.getStatusCode()); - return new NodeResponse(nodeId, method, uri, clientResponse, -1L, requestId); + return new NodeResponse(nodeId, request.getMethod(), uri, clientResponse, -1L, requestId); } }; http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/TestJsonEntitySerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/TestJsonEntitySerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/TestJsonEntitySerializer.java new file mode 100644 index 0000000..dd96826 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/TestJsonEntitySerializer.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication.okhttp; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Date; + +import org.apache.nifi.web.api.dto.BulletinDTO; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.util.TimeAdapter; +import org.apache.nifi.web.api.entity.BulletinEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.Test; + +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule; + +public class TestJsonEntitySerializer { + + @Test + public void testSerializeProcessor() throws IOException { + final ObjectMapper jsonCodec = new ObjectMapper(); + jsonCodec.registerModule(new JaxbAnnotationModule()); + jsonCodec.setSerializationInclusion(Include.NON_NULL); + + // Test that we can properly serialize a ProcessorEntity because it has many nested levels, including a Map + final JsonEntitySerializer serializer = new JsonEntitySerializer(jsonCodec); + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + + final ProcessorConfigDTO configDto = new ProcessorConfigDTO(); + configDto.setProperties(Collections.singletonMap("key", "value")); + final ProcessorDTO processorDto = new ProcessorDTO(); + processorDto.setConfig(configDto); + + final ProcessorEntity processor = new ProcessorEntity(); + processor.setId("123"); + processor.setComponent(processorDto); + + serializer.serialize(processor, baos); + + final String serialized = new String(baos.toByteArray(), StandardCharsets.UTF_8); + assertEquals("{\"id\":\"123\",\"component\":{\"config\":{\"properties\":{\"key\":\"value\"}}}}", serialized); + } + } + + @Test + public void testBulletinEntity() throws Exception { + final ObjectMapper jsonCodec = new ObjectMapper(); + jsonCodec.registerModule(new JaxbAnnotationModule()); + jsonCodec.setSerializationInclusion(Include.NON_NULL); + + final Date timestamp = new Date(); + final TimeAdapter adapter = new TimeAdapter(); + final String formattedTimestamp = adapter.marshal(timestamp); + + // Test that we can properly serialize a Bulletin because it contains a timestmap, + // which uses a JAXB annotation to specify how to marshal it. + final JsonEntitySerializer serializer = new JsonEntitySerializer(jsonCodec); + + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + + final BulletinDTO bulletinDto = new BulletinDTO(); + bulletinDto.setCategory("test"); + bulletinDto.setLevel("INFO"); + bulletinDto.setTimestamp(timestamp); + + final BulletinEntity bulletin = new BulletinEntity(); + bulletin.setBulletin(bulletinDto); + serializer.serialize(bulletin, baos); + + final String serialized = new String(baos.toByteArray(), StandardCharsets.UTF_8); + assertEquals("{\"bulletin\":{\"category\":\"test\",\"level\":\"INFO\",\"timestamp\":\"" + formattedTimestamp + "\"}}", serialized); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java new file mode 100644 index 0000000..54a8ac7 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication.util; + +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.net.URI; +import java.util.Collections; +import java.util.Date; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import javax.ws.rs.core.EntityTag; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.Link; +import javax.ws.rs.core.Link.Builder; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.NewCookie; +import javax.ws.rs.core.Response; + +import org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient; +import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest; + +public class MockReplicationClient implements HttpReplicationClient { + private int status = 200; + private Object responseEntity = null; + private MultivaluedMap<String, String> headers = new MultivaluedHashMap<>(); + + public void setResponse(final int status, final Object responseEntity, final MultivaluedMap<String, String> headers) { + this.status = status; + this.responseEntity = responseEntity; + this.headers = headers; + } + + @Override + public PreparedRequest prepareRequest(String method, Map<String, String> headers, Object entity) { + return new PreparedRequest() { + @Override + public String getMethod() { + return method; + } + + @Override + public Map<String, String> getHeaders() { + return headers; + } + + @Override + public Object getEntity() { + return entity; + } + }; + } + + @Override + public Response replicate(PreparedRequest request, String uri) throws IOException { + return new Response() { + + @Override + public int getStatus() { + return status; + } + + @Override + public StatusType getStatusInfo() { + return Status.fromStatusCode(status); + } + + @Override + public Object getEntity() { + return responseEntity; + } + + @Override + public <T> T readEntity(Class<T> entityType) { + if (responseEntity == null) { + return null; + } + + if (entityType.isAssignableFrom(responseEntity.getClass())) { + return entityType.cast(responseEntity); + } + + throw new IllegalArgumentException("Cannot cast entity as " + entityType); + } + + @Override + public <T> T readEntity(GenericType<T> entityType) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> T readEntity(Class<T> entityType, Annotation[] annotations) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> T readEntity(GenericType<T> entityType, Annotation[] annotations) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasEntity() { + return responseEntity != null; + } + + @Override + public boolean bufferEntity() { + return true; + } + + @Override + public void close() { + } + + @Override + public MediaType getMediaType() { + return MediaType.APPLICATION_JSON_TYPE; + } + + @Override + public Locale getLanguage() { + return null; + } + + @Override + public int getLength() { + return 0; + } + + @Override + public Set<String> getAllowedMethods() { + return Collections.emptySet(); + } + + @Override + public Map<String, NewCookie> getCookies() { + return Collections.emptyMap(); + } + + @Override + public EntityTag getEntityTag() { + return null; + } + + @Override + public Date getDate() { + return null; + } + + @Override + public Date getLastModified() { + return null; + } + + @Override + public URI getLocation() { + return URI.create(uri); + } + + @Override + public Set<Link> getLinks() { + return Collections.emptySet(); + } + + @Override + public boolean hasLink(String relation) { + return false; + } + + @Override + public Link getLink(String relation) { + return null; + } + + @Override + public Builder getLinkBuilder(String relation) { + return null; + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public MultivaluedMap<String, Object> getMetadata() { + return (MultivaluedMap) headers; + } + + @Override + public MultivaluedMap<String, String> getStringHeaders() { + return headers; + } + + @Override + public String getHeaderString(String name) { + return headers.getFirst(name); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy index b6bfd65..f616891 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy @@ -69,7 +69,7 @@ class ControllerServiceEntityMergerSpec extends Specification { component: new ControllerServiceReferencingComponentDTO(activeThreadCount: 1, state: ControllerServiceState.ENABLING.name()))]))] || new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), bulletins: [], - component: new ControllerServiceDTO(validationErrors: [], + component: new ControllerServiceDTO(validationErrors: [], validationStatus: "VALID", referencingComponents: [new ControllerServiceReferencingComponentEntity(permissions: new PermissionsDTO(canRead: false, canWrite: false))])) // Controller Reference merging for canRead==true [(createNodeIdentifier(1)): new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), @@ -83,7 +83,7 @@ class ControllerServiceEntityMergerSpec extends Specification { component: new ControllerServiceReferencingComponentDTO(activeThreadCount: 1, state: ControllerServiceState.ENABLING.name()))]))] || new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), bulletins: [], - component: new ControllerServiceDTO(validationErrors: [], + component: new ControllerServiceDTO(validationErrors: [], validationStatus: "VALID", referencingComponents: [new ControllerServiceReferencingComponentEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new ControllerServiceReferencingComponentDTO(activeThreadCount: 3, state: ControllerServiceState.ENABLING.name()))])) } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/DisabledServiceValidationResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/DisabledServiceValidationResult.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/DisabledServiceValidationResult.java new file mode 100644 index 0000000..7cf7c31 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/DisabledServiceValidationResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.validation; + +import org.apache.nifi.components.ValidationResult; + +public class DisabledServiceValidationResult extends ValidationResult { + private String serviceId; + + public DisabledServiceValidationResult(final String subject, final String serviceId) { + this(subject, serviceId, "Controller Service with ID " + serviceId + " is disabled"); + } + + public DisabledServiceValidationResult(final String subject, final String serviceId, final String explanation) { + super(new ValidationResult.Builder() + .input(serviceId) + .subject(subject) + .valid(false) + .explanation(explanation)); + + this.serviceId = serviceId; + } + + public String getControllerServiceIdentifier() { + return serviceId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationState.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationState.java new file mode 100644 index 0000000..17b3ff2 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationState.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.validation; + +import java.util.Collection; + +import org.apache.nifi.components.ValidationResult; + +public class ValidationState { + private final ValidationStatus status; + private final Collection<ValidationResult> validationErrors; + + public ValidationState(final ValidationStatus status, final Collection<ValidationResult> validationErrors) { + this.status = status; + this.validationErrors = validationErrors; + } + + public ValidationStatus getStatus() { + return status; + } + + public Collection<ValidationResult> getValidationErrors() { + return validationErrors; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationStatus.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationStatus.java new file mode 100644 index 0000000..e5e84f3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationStatus.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.validation; + +public enum ValidationStatus { + /** + * Component is valid + */ + VALID, + + /** + * Component is not valid + */ + INVALID, + + /** + * Component is in the process of validation, and it is unknown at this time whether the component is truly valid or not. + */ + VALIDATING; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationTrigger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationTrigger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationTrigger.java new file mode 100644 index 0000000..c117a1b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/validation/ValidationTrigger.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.validation; + +import org.apache.nifi.controller.ComponentNode; + +public interface ValidationTrigger { + /** + * Triggers validation of the given component to occur asynchronously + * + * @param component the component to validate + */ + void triggerAsync(ComponentNode component); + + /** + * Triggers validation of the given component immediately in the current thread + * + * @param component the component to validate + */ + void trigger(ComponentNode component); +}
