bejancsaba commented on code in PR #6434:
URL: https://github.com/apache/nifi/pull/6434#discussion_r984463436
##########
c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java:
##########
@@ -123,18 +129,41 @@ public Optional<byte[]> retrieveUpdateContent(String
flowUpdateUrl) {
@Override
public void acknowledgeOperation(C2OperationAck operationAck) {
- logger.info("Acknowledging Operation [{}] C2 URL [{}]",
operationAck.getOperationId(), clientConfig.getC2AckUrl());
+ logger.info("Acknowledging Operation {} C2 URL {}",
operationAck.getOperationId(), clientConfig.getC2AckUrl());
serializer.serialize(operationAck)
- .map(operationAckBody -> RequestBody.create(operationAckBody,
MEDIA_TYPE_APPLICATION_JSON))
+ .map(operationAckBody -> create(operationAckBody,
MEDIA_TYPE_APPLICATION_JSON))
.map(requestBody -> new
Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
.map(C2RequestCompression.forType(clientConfig.getC2RequestCompression())::compress)
.ifPresent(this::sendAck);
}
+ @Override
+ public Optional<String> uploadDebugBundle(String debugCallbackUrl, byte[]
debugBundle) {
Review Comment:
As this is a pretty "non-specific" class what do you think about renaming
this and related parameters / variables / logging to say just uploadBundle or
uploadPayload. It could be anything doesn't have to be debug as both url and
content are parameters from the client point of view this is transparent.
##########
c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java:
##########
@@ -143,7 +172,7 @@ private Optional<C2HeartbeatResponse> sendHeartbeat(String
heartbeat) {
try (Response heartbeatResponse =
httpClientReference.get().newCall(decoratedRequest).execute()) {
c2HeartbeatResponse =
getResponseBody(heartbeatResponse).flatMap(response ->
serializer.deserialize(response, C2HeartbeatResponse.class));
} catch (IOException ce) {
- logger.error("Send Heartbeat failed [{}]",
clientConfig.getC2Url(), ce);
+ logger.error("Send Heartbeat failed to url {}",
clientConfig.getC2Url(), ce);
Review Comment:
I see you really didn't like those brackets I suppose it is ok either way.
It is minor but just for consistency we are logging the url in multiple log
messages if we want to emphasise "to url" here, maybe it could be added for the
other occurrences in this class.
##########
c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandlerTest.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.write;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static
org.apache.nifi.c2.client.service.operation.DebugOperationHandler.NEW_LINE;
+import static
org.apache.nifi.c2.client.service.operation.DebugOperationHandler.TARGET_ARG;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DebugOperationHandlerTest {
+
+ private static final String OPERATION_ID = "operationId";
+ private static final String C2_DEBUG_UPLOAD_ENDPOINT =
"https://host/c2/api/upload";
+ private static final String DEFAULT_FILE_CONTENT = "some_textual_data";
+ private static final List<Path> VALID_BUNDLE_FILE_LIST =
singletonList(Paths.get("path_to_file"));
+ private static final Predicate<String> DEFAULT_CONTENT_FILTER = text ->
true;
+
+ @Mock
+ private C2Client c2Client;
+
+ @TempDir
+ private File tempDir;
+
+ private static Stream<Arguments> invalidConstructorArguments() {
+ C2Client mockC2Client = mock(C2Client.class);
+ return Stream.of(
+ Arguments.of(null, null, null),
+ Arguments.of(null, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER),
+ Arguments.of(mockC2Client, null, DEFAULT_CONTENT_FILTER),
+ Arguments.of(mockC2Client, emptyList(), DEFAULT_CONTENT_FILTER),
+ Arguments.of(mockC2Client, VALID_BUNDLE_FILE_LIST, null)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("invalidConstructorArguments")
Review Comment:
Very nice and thorough test just one note, you could customise this with
referencing the parameters so when it is executed it is printed what kind of
parameter combination is passed for each test, it is helpful to instantly see
what went wrong in case of errors.
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.createTempDirectory;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.lines;
+import static java.nio.file.Files.write;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.compress.utils.IOUtils.closeQuietly;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebugOperationHandler implements C2OperationHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DebugOperationHandler.class);
+
+ private static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server
callback URL was not found in request";
+ private static final String SUCCESSFUL_UPLOAD = "Debug bundle was uploaded
successfully";
+ private static final String UNABLE_TO_CREATE_BUNDLE = "Unable to create
debug bundle";
+
+ static final String TARGET_ARG = "target";
+ static final String NEW_LINE = "\n";
+
+ private final C2Client c2Client;
+ private final List<Path> bundleFilePaths;
+ private final Predicate<String> contentFilter;
+
+ private DebugOperationHandler(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ this.c2Client = c2Client;
+ this.bundleFilePaths = bundleFilePaths;
+ this.contentFilter = contentFilter;
+ }
+
+ public static DebugOperationHandler create(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ if (c2Client == null) {
+ throw new IllegalArgumentException("C2Client should not be null");
+ }
+ if (bundleFilePaths == null || bundleFilePaths.isEmpty()) {
+ throw new IllegalArgumentException("bundleFilePaths should not be
not null or empty");
+ }
+ if (contentFilter == null) {
+ throw new IllegalArgumentException("Exclude sensitive filter
should not be null");
+ }
+
+ return new DebugOperationHandler(c2Client, bundleFilePaths,
contentFilter);
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return TRANSFER;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return DEBUG;
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ String debugCallbackUrl = operation.getArgs().get(TARGET_ARG);
+ if (debugCallbackUrl == null) {
+ LOG.error("Callback URL was not found in C2 request.");
+ return operationAck(operation, operationState(NOT_APPLIED,
C2_CALLBACK_URL_NOT_FOUND));
+ }
+
+ List<Path> contentFilteredFilePaths = null;
+ C2OperationState operationState;
+ try {
+ contentFilteredFilePaths = filterContent(bundleFilePaths);
+ operationState = createDebugBundle(contentFilteredFilePaths)
+ .map(bundle -> c2Client.uploadDebugBundle(debugCallbackUrl,
bundle)
+ .map(errorMessage -> operationState(NOT_APPLIED,
errorMessage))
+ .orElseGet(() -> operationState(FULLY_APPLIED,
SUCCESSFUL_UPLOAD)))
+ .orElseGet(() -> operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE));
+ } catch (Exception e) {
+ LOG.error("Unexpected error happened", e);
+ operationState = operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE);
+ } finally {
+ ofNullable(contentFilteredFilePaths).ifPresent(this::cleanup);
+ }
+
+ LOG.debug("Returning operation ack with state {} and details {}",
operationState.getState(), operationState.getDetails());
Review Comment:
For debug purposes I suppose the operationId logging would be helpful as
well.
##########
c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java:
##########
@@ -123,18 +129,41 @@ public Optional<byte[]> retrieveUpdateContent(String
flowUpdateUrl) {
@Override
public void acknowledgeOperation(C2OperationAck operationAck) {
- logger.info("Acknowledging Operation [{}] C2 URL [{}]",
operationAck.getOperationId(), clientConfig.getC2AckUrl());
+ logger.info("Acknowledging Operation {} C2 URL {}",
operationAck.getOperationId(), clientConfig.getC2AckUrl());
serializer.serialize(operationAck)
- .map(operationAckBody -> RequestBody.create(operationAckBody,
MEDIA_TYPE_APPLICATION_JSON))
+ .map(operationAckBody -> create(operationAckBody,
MEDIA_TYPE_APPLICATION_JSON))
.map(requestBody -> new
Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
.map(C2RequestCompression.forType(clientConfig.getC2RequestCompression())::compress)
.ifPresent(this::sendAck);
}
+ @Override
+ public Optional<String> uploadDebugBundle(String debugCallbackUrl, byte[]
debugBundle) {
+ Request request = new Request.Builder()
+ .url(debugCallbackUrl)
+ .post(new MultipartBody.Builder()
+ .setType(FORM)
+ .addFormDataPart(MULTIPART_FORM_FILE_FIELD_NAME,
DEBUG_BUNDLE_FILE_NAME, create(debugBundle, DEBUG_BUNDLE_MIME_TYPE))
+ .build())
+ .build();
+
+ logger.info("Uploading debug bundle to url {} with size {}",
debugCallbackUrl, debugBundle.length);
+ try (Response response =
httpClientReference.get().newCall(request).execute()) {
+ if (!response.isSuccessful()) {
+ logger.warn("Upload debug bundle failed to C2 server {} with
status code {}", debugCallbackUrl, response.code());
Review Comment:
Should this not be an error as well like the one in the catch block? I mean
from the users point of view both has the same affect.
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.createTempDirectory;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.lines;
+import static java.nio.file.Files.write;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.compress.utils.IOUtils.closeQuietly;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebugOperationHandler implements C2OperationHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DebugOperationHandler.class);
+
+ private static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server
callback URL was not found in request";
+ private static final String SUCCESSFUL_UPLOAD = "Debug bundle was uploaded
successfully";
+ private static final String UNABLE_TO_CREATE_BUNDLE = "Unable to create
debug bundle";
+
+ static final String TARGET_ARG = "target";
+ static final String NEW_LINE = "\n";
+
+ private final C2Client c2Client;
+ private final List<Path> bundleFilePaths;
+ private final Predicate<String> contentFilter;
+
+ private DebugOperationHandler(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ this.c2Client = c2Client;
+ this.bundleFilePaths = bundleFilePaths;
+ this.contentFilter = contentFilter;
+ }
+
+ public static DebugOperationHandler create(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ if (c2Client == null) {
+ throw new IllegalArgumentException("C2Client should not be null");
+ }
+ if (bundleFilePaths == null || bundleFilePaths.isEmpty()) {
+ throw new IllegalArgumentException("bundleFilePaths should not be
not null or empty");
+ }
+ if (contentFilter == null) {
+ throw new IllegalArgumentException("Exclude sensitive filter
should not be null");
+ }
+
+ return new DebugOperationHandler(c2Client, bundleFilePaths,
contentFilter);
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return TRANSFER;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return DEBUG;
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ String debugCallbackUrl = operation.getArgs().get(TARGET_ARG);
+ if (debugCallbackUrl == null) {
+ LOG.error("Callback URL was not found in C2 request.");
+ return operationAck(operation, operationState(NOT_APPLIED,
C2_CALLBACK_URL_NOT_FOUND));
+ }
+
+ List<Path> contentFilteredFilePaths = null;
+ C2OperationState operationState;
+ try {
+ contentFilteredFilePaths = filterContent(bundleFilePaths);
+ operationState = createDebugBundle(contentFilteredFilePaths)
+ .map(bundle -> c2Client.uploadDebugBundle(debugCallbackUrl,
bundle)
+ .map(errorMessage -> operationState(NOT_APPLIED,
errorMessage))
+ .orElseGet(() -> operationState(FULLY_APPLIED,
SUCCESSFUL_UPLOAD)))
+ .orElseGet(() -> operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE));
+ } catch (Exception e) {
+ LOG.error("Unexpected error happened", e);
+ operationState = operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE);
+ } finally {
+ ofNullable(contentFilteredFilePaths).ifPresent(this::cleanup);
+ }
+
+ LOG.debug("Returning operation ack with state {} and details {}",
operationState.getState(), operationState.getDetails());
+ return operationAck(operation, operationState);
+ }
+
+ private C2OperationAck operationAck(C2Operation operation,
C2OperationState state) {
+ C2OperationAck operationAck = new C2OperationAck();
+
operationAck.setOperationId(ofNullable(operation.getIdentifier()).orElse(EMPTY));
+ operationAck.setOperationState(state);
+ return operationAck;
+ }
+
+ private C2OperationState operationState(OperationState operationState,
String details) {
+ C2OperationState state = new C2OperationState();
+ state.setState(operationState);
+ state.setDetails(details);
+ return state;
+ }
+
+ private List<Path> filterContent(List<Path> bundleFilePaths) {
+ List<Path> contentFilteredFilePaths = new ArrayList<>();
+ for (Path path : bundleFilePaths) {
+ String fileName = path.getFileName().toString();
+ try (Stream<String> fileStream = lines(path)) {
+ Path tempDirectory = createTempDirectory(null);
+ Path tempFile =
Paths.get(tempDirectory.toAbsolutePath().toString(), fileName);
+ write(tempFile, (Iterable<String>)
fileStream.filter(contentFilter)::iterator);
+ contentFilteredFilePaths.add(tempFile);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return contentFilteredFilePaths;
+ }
+
+ private Optional<byte[]> createDebugBundle(List<Path> filePaths) {
+ ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+ try (GzipCompressorOutputStream gzipCompressorOutputStream = new
GzipCompressorOutputStream(byteOutputStream);
Review Comment:
nice!
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.createTempDirectory;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.lines;
+import static java.nio.file.Files.write;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.compress.utils.IOUtils.closeQuietly;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebugOperationHandler implements C2OperationHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DebugOperationHandler.class);
+
+ private static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server
callback URL was not found in request";
+ private static final String SUCCESSFUL_UPLOAD = "Debug bundle was uploaded
successfully";
+ private static final String UNABLE_TO_CREATE_BUNDLE = "Unable to create
debug bundle";
+
+ static final String TARGET_ARG = "target";
+ static final String NEW_LINE = "\n";
+
+ private final C2Client c2Client;
+ private final List<Path> bundleFilePaths;
+ private final Predicate<String> contentFilter;
+
+ private DebugOperationHandler(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ this.c2Client = c2Client;
+ this.bundleFilePaths = bundleFilePaths;
+ this.contentFilter = contentFilter;
+ }
+
+ public static DebugOperationHandler create(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ if (c2Client == null) {
+ throw new IllegalArgumentException("C2Client should not be null");
+ }
+ if (bundleFilePaths == null || bundleFilePaths.isEmpty()) {
+ throw new IllegalArgumentException("bundleFilePaths should not be
not null or empty");
+ }
+ if (contentFilter == null) {
+ throw new IllegalArgumentException("Exclude sensitive filter
should not be null");
Review Comment:
If you are explicit with the error logging maybe the variable name could be
more explicit as well e.g.: excludeSensitvePropertiesFilter or something
shorter :) What do you think?
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.createTempDirectory;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.lines;
+import static java.nio.file.Files.write;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.compress.utils.IOUtils.closeQuietly;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebugOperationHandler implements C2OperationHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DebugOperationHandler.class);
+
+ private static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server
callback URL was not found in request";
+ private static final String SUCCESSFUL_UPLOAD = "Debug bundle was uploaded
successfully";
+ private static final String UNABLE_TO_CREATE_BUNDLE = "Unable to create
debug bundle";
+
+ static final String TARGET_ARG = "target";
+ static final String NEW_LINE = "\n";
+
+ private final C2Client c2Client;
+ private final List<Path> bundleFilePaths;
+ private final Predicate<String> contentFilter;
+
+ private DebugOperationHandler(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ this.c2Client = c2Client;
+ this.bundleFilePaths = bundleFilePaths;
+ this.contentFilter = contentFilter;
+ }
+
+ public static DebugOperationHandler create(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ if (c2Client == null) {
+ throw new IllegalArgumentException("C2Client should not be null");
+ }
+ if (bundleFilePaths == null || bundleFilePaths.isEmpty()) {
+ throw new IllegalArgumentException("bundleFilePaths should not be
not null or empty");
+ }
+ if (contentFilter == null) {
+ throw new IllegalArgumentException("Exclude sensitive filter
should not be null");
+ }
+
+ return new DebugOperationHandler(c2Client, bundleFilePaths,
contentFilter);
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return TRANSFER;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return DEBUG;
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ String debugCallbackUrl = operation.getArgs().get(TARGET_ARG);
+ if (debugCallbackUrl == null) {
+ LOG.error("Callback URL was not found in C2 request.");
+ return operationAck(operation, operationState(NOT_APPLIED,
C2_CALLBACK_URL_NOT_FOUND));
+ }
+
+ List<Path> contentFilteredFilePaths = null;
+ C2OperationState operationState;
+ try {
+ contentFilteredFilePaths = filterContent(bundleFilePaths);
+ operationState = createDebugBundle(contentFilteredFilePaths)
+ .map(bundle -> c2Client.uploadDebugBundle(debugCallbackUrl,
bundle)
+ .map(errorMessage -> operationState(NOT_APPLIED,
errorMessage))
+ .orElseGet(() -> operationState(FULLY_APPLIED,
SUCCESSFUL_UPLOAD)))
+ .orElseGet(() -> operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE));
+ } catch (Exception e) {
+ LOG.error("Unexpected error happened", e);
+ operationState = operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE);
+ } finally {
+ ofNullable(contentFilteredFilePaths).ifPresent(this::cleanup);
+ }
+
+ LOG.debug("Returning operation ack with state {} and details {}",
operationState.getState(), operationState.getDetails());
+ return operationAck(operation, operationState);
+ }
+
+ private C2OperationAck operationAck(C2Operation operation,
C2OperationState state) {
+ C2OperationAck operationAck = new C2OperationAck();
+
operationAck.setOperationId(ofNullable(operation.getIdentifier()).orElse(EMPTY));
+ operationAck.setOperationState(state);
+ return operationAck;
+ }
+
+ private C2OperationState operationState(OperationState operationState,
String details) {
+ C2OperationState state = new C2OperationState();
+ state.setState(operationState);
+ state.setDetails(details);
+ return state;
+ }
+
+ private List<Path> filterContent(List<Path> bundleFilePaths) {
+ List<Path> contentFilteredFilePaths = new ArrayList<>();
+ for (Path path : bundleFilePaths) {
+ String fileName = path.getFileName().toString();
+ try (Stream<String> fileStream = lines(path)) {
+ Path tempDirectory = createTempDirectory(null);
+ Path tempFile =
Paths.get(tempDirectory.toAbsolutePath().toString(), fileName);
+ write(tempFile, (Iterable<String>)
fileStream.filter(contentFilter)::iterator);
+ contentFilteredFilePaths.add(tempFile);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return contentFilteredFilePaths;
+ }
+
+ private Optional<byte[]> createDebugBundle(List<Path> filePaths) {
+ ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+ try (GzipCompressorOutputStream gzipCompressorOutputStream = new
GzipCompressorOutputStream(byteOutputStream);
+ TarArchiveOutputStream tarOutputStream = new
TarArchiveOutputStream(gzipCompressorOutputStream)) {
+ for (Path filePath : filePaths) {
+ TarArchiveEntry tarArchiveEntry = new
TarArchiveEntry(filePath.toFile(), filePath.getFileName().toString());
+ tarOutputStream.putArchiveEntry(tarArchiveEntry);
+ copy(filePath, tarOutputStream);
+ tarOutputStream.closeArchiveEntry();
+ }
+ tarOutputStream.finish();
+ } catch (Exception e) {
+ LOG.error("Error during create compressed bundle", e);
+ return empty();
+ } finally {
+ closeQuietly(byteOutputStream);
+ }
+ return
Optional.of(byteOutputStream).map(ByteArrayOutputStream::toByteArray);
+ }
+
+ private void cleanup(List<Path> paths) {
Review Comment:
It doesn't change the world I just looked it up because it was interesting.
If you have the base dir you can use the file walker approach which will
iterate over everything in the dir
```
try (Stream<Path> walk = Files.walk(baseDirectory)) {
walk.map(Path::toFile).forEach(File::delete);
}
```
https://stackoverflow.com/questions/35988192/java-nio-most-concise-recursive-directory-delete
Your approach is slightly different as you have the list of files to delete
but in the end you want to delete the whole (hopefully empty) directory in the
end.
Regardless I'm fine with the current approach as well.
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.createTempDirectory;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.lines;
+import static java.nio.file.Files.write;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.compress.utils.IOUtils.closeQuietly;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebugOperationHandler implements C2OperationHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DebugOperationHandler.class);
+
+ private static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server
callback URL was not found in request";
+ private static final String SUCCESSFUL_UPLOAD = "Debug bundle was uploaded
successfully";
+ private static final String UNABLE_TO_CREATE_BUNDLE = "Unable to create
debug bundle";
+
+ static final String TARGET_ARG = "target";
+ static final String NEW_LINE = "\n";
+
+ private final C2Client c2Client;
+ private final List<Path> bundleFilePaths;
+ private final Predicate<String> contentFilter;
+
+ private DebugOperationHandler(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ this.c2Client = c2Client;
+ this.bundleFilePaths = bundleFilePaths;
+ this.contentFilter = contentFilter;
+ }
+
+ public static DebugOperationHandler create(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ if (c2Client == null) {
+ throw new IllegalArgumentException("C2Client should not be null");
+ }
+ if (bundleFilePaths == null || bundleFilePaths.isEmpty()) {
+ throw new IllegalArgumentException("bundleFilePaths should not be
not null or empty");
+ }
+ if (contentFilter == null) {
+ throw new IllegalArgumentException("Exclude sensitive filter
should not be null");
+ }
+
+ return new DebugOperationHandler(c2Client, bundleFilePaths,
contentFilter);
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return TRANSFER;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return DEBUG;
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ String debugCallbackUrl = operation.getArgs().get(TARGET_ARG);
+ if (debugCallbackUrl == null) {
+ LOG.error("Callback URL was not found in C2 request.");
+ return operationAck(operation, operationState(NOT_APPLIED,
C2_CALLBACK_URL_NOT_FOUND));
+ }
+
+ List<Path> contentFilteredFilePaths = null;
+ C2OperationState operationState;
+ try {
+ contentFilteredFilePaths = filterContent(bundleFilePaths);
+ operationState = createDebugBundle(contentFilteredFilePaths)
+ .map(bundle -> c2Client.uploadDebugBundle(debugCallbackUrl,
bundle)
+ .map(errorMessage -> operationState(NOT_APPLIED,
errorMessage))
+ .orElseGet(() -> operationState(FULLY_APPLIED,
SUCCESSFUL_UPLOAD)))
+ .orElseGet(() -> operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE));
+ } catch (Exception e) {
+ LOG.error("Unexpected error happened", e);
+ operationState = operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE);
+ } finally {
+ ofNullable(contentFilteredFilePaths).ifPresent(this::cleanup);
+ }
+
+ LOG.debug("Returning operation ack with state {} and details {}",
operationState.getState(), operationState.getDetails());
+ return operationAck(operation, operationState);
+ }
+
+ private C2OperationAck operationAck(C2Operation operation,
C2OperationState state) {
+ C2OperationAck operationAck = new C2OperationAck();
+
operationAck.setOperationId(ofNullable(operation.getIdentifier()).orElse(EMPTY));
+ operationAck.setOperationState(state);
+ return operationAck;
+ }
+
+ private C2OperationState operationState(OperationState operationState,
String details) {
+ C2OperationState state = new C2OperationState();
+ state.setState(operationState);
+ state.setDetails(details);
+ return state;
+ }
+
+ private List<Path> filterContent(List<Path> bundleFilePaths) {
+ List<Path> contentFilteredFilePaths = new ArrayList<>();
+ for (Path path : bundleFilePaths) {
+ String fileName = path.getFileName().toString();
+ try (Stream<String> fileStream = lines(path)) {
+ Path tempDirectory = createTempDirectory(null);
+ Path tempFile =
Paths.get(tempDirectory.toAbsolutePath().toString(), fileName);
+ write(tempFile, (Iterable<String>)
fileStream.filter(contentFilter)::iterator);
Review Comment:
Similarly to Paths.get on the line above I think it would help readability
if we would have Files.write here. I know it is a matter of taste so if you
feel strongly it is ok by me.
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.createTempDirectory;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.lines;
+import static java.nio.file.Files.write;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.compress.utils.IOUtils.closeQuietly;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebugOperationHandler implements C2OperationHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DebugOperationHandler.class);
+
+ private static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server
callback URL was not found in request";
+ private static final String SUCCESSFUL_UPLOAD = "Debug bundle was uploaded
successfully";
+ private static final String UNABLE_TO_CREATE_BUNDLE = "Unable to create
debug bundle";
+
+ static final String TARGET_ARG = "target";
+ static final String NEW_LINE = "\n";
+
+ private final C2Client c2Client;
+ private final List<Path> bundleFilePaths;
+ private final Predicate<String> contentFilter;
+
+ private DebugOperationHandler(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ this.c2Client = c2Client;
+ this.bundleFilePaths = bundleFilePaths;
+ this.contentFilter = contentFilter;
+ }
+
+ public static DebugOperationHandler create(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ if (c2Client == null) {
+ throw new IllegalArgumentException("C2Client should not be null");
+ }
+ if (bundleFilePaths == null || bundleFilePaths.isEmpty()) {
+ throw new IllegalArgumentException("bundleFilePaths should not be
not null or empty");
+ }
+ if (contentFilter == null) {
+ throw new IllegalArgumentException("Exclude sensitive filter
should not be null");
+ }
+
+ return new DebugOperationHandler(c2Client, bundleFilePaths,
contentFilter);
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return TRANSFER;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return DEBUG;
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ String debugCallbackUrl = operation.getArgs().get(TARGET_ARG);
+ if (debugCallbackUrl == null) {
+ LOG.error("Callback URL was not found in C2 request.");
+ return operationAck(operation, operationState(NOT_APPLIED,
C2_CALLBACK_URL_NOT_FOUND));
+ }
+
+ List<Path> contentFilteredFilePaths = null;
+ C2OperationState operationState;
+ try {
+ contentFilteredFilePaths = filterContent(bundleFilePaths);
+ operationState = createDebugBundle(contentFilteredFilePaths)
+ .map(bundle -> c2Client.uploadDebugBundle(debugCallbackUrl,
bundle)
+ .map(errorMessage -> operationState(NOT_APPLIED,
errorMessage))
+ .orElseGet(() -> operationState(FULLY_APPLIED,
SUCCESSFUL_UPLOAD)))
+ .orElseGet(() -> operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE));
+ } catch (Exception e) {
+ LOG.error("Unexpected error happened", e);
+ operationState = operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE);
+ } finally {
+ ofNullable(contentFilteredFilePaths).ifPresent(this::cleanup);
+ }
+
+ LOG.debug("Returning operation ack with state {} and details {}",
operationState.getState(), operationState.getDetails());
+ return operationAck(operation, operationState);
+ }
+
+ private C2OperationAck operationAck(C2Operation operation,
C2OperationState state) {
+ C2OperationAck operationAck = new C2OperationAck();
+
operationAck.setOperationId(ofNullable(operation.getIdentifier()).orElse(EMPTY));
+ operationAck.setOperationState(state);
+ return operationAck;
+ }
+
+ private C2OperationState operationState(OperationState operationState,
String details) {
+ C2OperationState state = new C2OperationState();
+ state.setState(operationState);
+ state.setDetails(details);
+ return state;
+ }
+
+ private List<Path> filterContent(List<Path> bundleFilePaths) {
+ List<Path> contentFilteredFilePaths = new ArrayList<>();
+ for (Path path : bundleFilePaths) {
+ String fileName = path.getFileName().toString();
+ try (Stream<String> fileStream = lines(path)) {
+ Path tempDirectory = createTempDirectory(null);
+ Path tempFile =
Paths.get(tempDirectory.toAbsolutePath().toString(), fileName);
+ write(tempFile, (Iterable<String>)
fileStream.filter(contentFilter)::iterator);
+ contentFilteredFilePaths.add(tempFile);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return contentFilteredFilePaths;
+ }
+
+ private Optional<byte[]> createDebugBundle(List<Path> filePaths) {
+ ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+ try (GzipCompressorOutputStream gzipCompressorOutputStream = new
GzipCompressorOutputStream(byteOutputStream);
+ TarArchiveOutputStream tarOutputStream = new
TarArchiveOutputStream(gzipCompressorOutputStream)) {
+ for (Path filePath : filePaths) {
+ TarArchiveEntry tarArchiveEntry = new
TarArchiveEntry(filePath.toFile(), filePath.getFileName().toString());
+ tarOutputStream.putArchiveEntry(tarArchiveEntry);
+ copy(filePath, tarOutputStream);
+ tarOutputStream.closeArchiveEntry();
+ }
+ tarOutputStream.finish();
+ } catch (Exception e) {
+ LOG.error("Error during create compressed bundle", e);
+ return empty();
+ } finally {
+ closeQuietly(byteOutputStream);
+ }
+ return
Optional.of(byteOutputStream).map(ByteArrayOutputStream::toByteArray);
+ }
+
+ private void cleanup(List<Path> paths) {
+ Optional<Path> firstPath = paths.stream().findFirst();
+ if (firstPath.isPresent()) {
+ Path baseDirectory = firstPath.get().getParent();
+ paths.forEach(path -> deleteQuietly(path, "Unable to delete
temporary file"));
+ deleteQuietly(baseDirectory, "Unable to delete temporary
directory");
+ }
+ }
+
+ private static void deleteQuietly(Path baseDirectory, String errorMessage)
{
Review Comment:
Is it intentional that this is static and the cleanup for example is not?
##########
c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandlerTest.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.write;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static
org.apache.nifi.c2.client.service.operation.DebugOperationHandler.NEW_LINE;
+import static
org.apache.nifi.c2.client.service.operation.DebugOperationHandler.TARGET_ARG;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DebugOperationHandlerTest {
+
+ private static final String OPERATION_ID = "operationId";
+ private static final String C2_DEBUG_UPLOAD_ENDPOINT =
"https://host/c2/api/upload";
+ private static final String DEFAULT_FILE_CONTENT = "some_textual_data";
+ private static final List<Path> VALID_BUNDLE_FILE_LIST =
singletonList(Paths.get("path_to_file"));
+ private static final Predicate<String> DEFAULT_CONTENT_FILTER = text ->
true;
+
+ @Mock
+ private C2Client c2Client;
+
+ @TempDir
+ private File tempDir;
+
+ private static Stream<Arguments> invalidConstructorArguments() {
+ C2Client mockC2Client = mock(C2Client.class);
+ return Stream.of(
+ Arguments.of(null, null, null),
+ Arguments.of(null, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER),
+ Arguments.of(mockC2Client, null, DEFAULT_CONTENT_FILTER),
+ Arguments.of(mockC2Client, emptyList(), DEFAULT_CONTENT_FILTER),
+ Arguments.of(mockC2Client, VALID_BUNDLE_FILE_LIST, null)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("invalidConstructorArguments")
+ public void
testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client,
List<Path> bundleFilePaths, Predicate<String> contentFilter) {
+ assertThrows(IllegalArgumentException.class, () ->
DebugOperationHandler.create(c2Client, bundleFilePaths, contentFilter));
+ }
+
+ @Test
+ public void testOperationAndOperandTypesAreMatching() {
+ // given
+ DebugOperationHandler testHandler =
DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST,
DEFAULT_CONTENT_FILTER);
+
+ // when + then
+ assertEquals(TRANSFER, testHandler.getOperationType());
+ assertEquals(DEBUG, testHandler.getOperandType());
+ }
+
+ @Test
+ public void testC2CallbackUrlIsNullInArgs() {
+ // given
+ DebugOperationHandler testHandler =
DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST,
DEFAULT_CONTENT_FILTER);
+ C2Operation c2Operation = operation(null);
+
+ // when
+ C2OperationAck result = testHandler.handle(c2Operation);
+
+ // then
+ assertEquals(OPERATION_ID, result.getOperationId());
+ assertEquals(NOT_APPLIED, result.getOperationState().getState());
+ }
+
+ @Test
+ public void testFilesAreCollectedAndUploadedAsATarGzBundle() {
+ // given
+ Map<String, String> bundleFileNamesWithContents = asList("file.log",
"application.conf", "default.properties")
Review Comment:
Very minor you could use Stream.of()
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.createTempDirectory;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.lines;
+import static java.nio.file.Files.write;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.compress.utils.IOUtils.closeQuietly;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebugOperationHandler implements C2OperationHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DebugOperationHandler.class);
+
+ private static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server
callback URL was not found in request";
+ private static final String SUCCESSFUL_UPLOAD = "Debug bundle was uploaded
successfully";
+ private static final String UNABLE_TO_CREATE_BUNDLE = "Unable to create
debug bundle";
+
+ static final String TARGET_ARG = "target";
+ static final String NEW_LINE = "\n";
+
+ private final C2Client c2Client;
+ private final List<Path> bundleFilePaths;
+ private final Predicate<String> contentFilter;
+
+ private DebugOperationHandler(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ this.c2Client = c2Client;
+ this.bundleFilePaths = bundleFilePaths;
+ this.contentFilter = contentFilter;
+ }
+
+ public static DebugOperationHandler create(C2Client c2Client, List<Path>
bundleFilePaths, Predicate<String> contentFilter) {
+ if (c2Client == null) {
+ throw new IllegalArgumentException("C2Client should not be null");
+ }
+ if (bundleFilePaths == null || bundleFilePaths.isEmpty()) {
+ throw new IllegalArgumentException("bundleFilePaths should not be
not null or empty");
+ }
+ if (contentFilter == null) {
+ throw new IllegalArgumentException("Exclude sensitive filter
should not be null");
+ }
+
+ return new DebugOperationHandler(c2Client, bundleFilePaths,
contentFilter);
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return TRANSFER;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return DEBUG;
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ String debugCallbackUrl = operation.getArgs().get(TARGET_ARG);
+ if (debugCallbackUrl == null) {
+ LOG.error("Callback URL was not found in C2 request.");
+ return operationAck(operation, operationState(NOT_APPLIED,
C2_CALLBACK_URL_NOT_FOUND));
+ }
+
+ List<Path> contentFilteredFilePaths = null;
+ C2OperationState operationState;
+ try {
+ contentFilteredFilePaths = filterContent(bundleFilePaths);
+ operationState = createDebugBundle(contentFilteredFilePaths)
+ .map(bundle -> c2Client.uploadDebugBundle(debugCallbackUrl,
bundle)
+ .map(errorMessage -> operationState(NOT_APPLIED,
errorMessage))
+ .orElseGet(() -> operationState(FULLY_APPLIED,
SUCCESSFUL_UPLOAD)))
+ .orElseGet(() -> operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE));
+ } catch (Exception e) {
+ LOG.error("Unexpected error happened", e);
+ operationState = operationState(NOT_APPLIED,
UNABLE_TO_CREATE_BUNDLE);
+ } finally {
+ ofNullable(contentFilteredFilePaths).ifPresent(this::cleanup);
+ }
+
+ LOG.debug("Returning operation ack with state {} and details {}",
operationState.getState(), operationState.getDetails());
+ return operationAck(operation, operationState);
+ }
+
+ private C2OperationAck operationAck(C2Operation operation,
C2OperationState state) {
+ C2OperationAck operationAck = new C2OperationAck();
+
operationAck.setOperationId(ofNullable(operation.getIdentifier()).orElse(EMPTY));
+ operationAck.setOperationState(state);
+ return operationAck;
+ }
+
+ private C2OperationState operationState(OperationState operationState,
String details) {
+ C2OperationState state = new C2OperationState();
+ state.setState(operationState);
+ state.setDetails(details);
+ return state;
+ }
+
+ private List<Path> filterContent(List<Path> bundleFilePaths) {
+ List<Path> contentFilteredFilePaths = new ArrayList<>();
+ for (Path path : bundleFilePaths) {
+ String fileName = path.getFileName().toString();
+ try (Stream<String> fileStream = lines(path)) {
+ Path tempDirectory = createTempDirectory(null);
Review Comment:
Maybe it would be useful to provide some kind of prefix, if something goes
wrong on the agent side at a later investigation it could be helpful to search
for / identify stuff. What do you think?
##########
c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandlerTest.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.write;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static
org.apache.nifi.c2.client.service.operation.DebugOperationHandler.NEW_LINE;
+import static
org.apache.nifi.c2.client.service.operation.DebugOperationHandler.TARGET_ARG;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DebugOperationHandlerTest {
+
+ private static final String OPERATION_ID = "operationId";
+ private static final String C2_DEBUG_UPLOAD_ENDPOINT =
"https://host/c2/api/upload";
+ private static final String DEFAULT_FILE_CONTENT = "some_textual_data";
+ private static final List<Path> VALID_BUNDLE_FILE_LIST =
singletonList(Paths.get("path_to_file"));
+ private static final Predicate<String> DEFAULT_CONTENT_FILTER = text ->
true;
+
+ @Mock
+ private C2Client c2Client;
+
+ @TempDir
+ private File tempDir;
+
+ private static Stream<Arguments> invalidConstructorArguments() {
+ C2Client mockC2Client = mock(C2Client.class);
+ return Stream.of(
+ Arguments.of(null, null, null),
+ Arguments.of(null, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER),
+ Arguments.of(mockC2Client, null, DEFAULT_CONTENT_FILTER),
+ Arguments.of(mockC2Client, emptyList(), DEFAULT_CONTENT_FILTER),
+ Arguments.of(mockC2Client, VALID_BUNDLE_FILE_LIST, null)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("invalidConstructorArguments")
+ public void
testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client,
List<Path> bundleFilePaths, Predicate<String> contentFilter) {
+ assertThrows(IllegalArgumentException.class, () ->
DebugOperationHandler.create(c2Client, bundleFilePaths, contentFilter));
+ }
+
+ @Test
+ public void testOperationAndOperandTypesAreMatching() {
+ // given
+ DebugOperationHandler testHandler =
DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST,
DEFAULT_CONTENT_FILTER);
+
+ // when + then
+ assertEquals(TRANSFER, testHandler.getOperationType());
+ assertEquals(DEBUG, testHandler.getOperandType());
+ }
+
+ @Test
+ public void testC2CallbackUrlIsNullInArgs() {
+ // given
+ DebugOperationHandler testHandler =
DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST,
DEFAULT_CONTENT_FILTER);
+ C2Operation c2Operation = operation(null);
+
+ // when
+ C2OperationAck result = testHandler.handle(c2Operation);
+
+ // then
+ assertEquals(OPERATION_ID, result.getOperationId());
+ assertEquals(NOT_APPLIED, result.getOperationState().getState());
+ }
+
+ @Test
+ public void testFilesAreCollectedAndUploadedAsATarGzBundle() {
+ // given
+ Map<String, String> bundleFileNamesWithContents = asList("file.log",
"application.conf", "default.properties")
+ .stream()
+ .collect(toMap(identity(), __ -> DEFAULT_FILE_CONTENT));
+ List<Path> createBundleFiles =
bundleFileNamesWithContents.entrySet().stream()
+ .map(entry -> placeFileWithContent(entry.getKey(),
entry.getValue()))
+ .collect(toList());
+ DebugOperationHandler testHandler =
DebugOperationHandler.create(c2Client, createBundleFiles,
DEFAULT_CONTENT_FILTER);
+ C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
+
+ // when
+ C2OperationAck result = testHandler.handle(c2Operation);
+
+ // then
+ ArgumentCaptor<String> uploadUrlCaptor =
ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<byte[]> uploadBundleCaptor =
ArgumentCaptor.forClass(byte[].class);
+ verify(c2Client).uploadDebugBundle(uploadUrlCaptor.capture(),
uploadBundleCaptor.capture());
+ assertEquals(OPERATION_ID, result.getOperationId());
+ assertEquals(FULLY_APPLIED, result.getOperationState().getState());
+ assertEquals(C2_DEBUG_UPLOAD_ENDPOINT, uploadUrlCaptor.getValue());
+ Map<String, String> resultBundle =
extractBundle(uploadBundleCaptor.getValue());
+ assertTrue(mapEqual(bundleFileNamesWithContents, resultBundle));
+ }
+
+ @Test
+ public void testFileToCollectDoesNotExist() {
+ // given
+ DebugOperationHandler testHandler =
DebugOperationHandler.create(c2Client,
singletonList(Paths.get(tempDir.getAbsolutePath(), "missing_file")),
DEFAULT_CONTENT_FILTER);
+ C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
+
+ // when
+ C2OperationAck result = testHandler.handle(c2Operation);
+
+ // then
+ assertEquals(OPERATION_ID, result.getOperationId());
+ assertEquals(NOT_APPLIED, result.getOperationState().getState());
+ }
+
+ @Test
+ public void testContentIsFilteredOut() {
+ // given
+ String filterKeyword = "minifi";
+ String bundleFileName = "filter_content.file";
+ String fileContent = Stream.of("line one", "line two " +
filterKeyword, filterKeyword + "line three", "line
four").collect(joining(NEW_LINE));
Review Comment:
What do you think about extending with a case when the keyword is inside the
string, just to be safe :)
Also validating what happens if for some reason everything / nothing is
filtered out. I know super edge case but could be interesting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]