This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new bf5b6f53ae HDDS-7867. Clean up replication logs (#4397)
bf5b6f53ae is described below
commit bf5b6f53ae5eb01b8917577c6e0e59771095322a
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Wed Mar 15 19:05:03 2023 +0100
HDDS-7867. Clean up replication logs (#4397)
---
.../reconstruction/ECReconstructionCommandInfo.java | 7 ++++---
.../ECReconstructionCoordinatorTask.java | 6 ++++--
.../replication/AbstractReplicationTask.java | 19 +++++++++++++++++++
.../replication/GrpcContainerUploader.java | 2 +-
.../container/replication/GrpcOutputStream.java | 20 +++++++++++++++-----
.../replication/GrpcReplicationService.java | 2 +-
.../ozone/container/replication/PushReplicator.java | 1 +
.../replication/ReplicationSupervisor.java | 18 ++++++++++--------
.../container/replication/ReplicationTask.java | 21 ++++++++++++---------
.../replication/SendContainerRequestHandler.java | 15 +++++++++------
.../commands/ReplicateContainerCommand.java | 10 +++++-----
.../container/replication/GrpcOutputStreamTest.java | 11 +++++++++++
.../replication/UnhealthyReplicationProcessor.java | 6 ++++--
.../dist/src/main/compose/ozonesecure/docker-config | 3 +--
14 files changed, 97 insertions(+), 44 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
index c053a9ae92..648a63be42 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
@@ -29,6 +29,7 @@ import java.util.stream.IntStream;
import static java.util.Collections.unmodifiableSortedMap;
import static java.util.stream.Collectors.toMap;
+import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand;
/**
* This class is to keep the required EC reconstruction info.
@@ -86,12 +87,12 @@ public class ECReconstructionCommandInfo {
@Override
public String toString() {
- return "ECReconstructionCommand{"
- + "containerID=" + containerID
+ return reconstructECContainersCommand
+ + ": containerID=" + containerID
+ ", replication=" + ecReplicationConfig.getReplication()
+ ", missingIndexes=" + Arrays.toString(missingContainerIndexes)
+ ", sources=" + sourceNodeMap
- + ", targets=" + targetNodeMap + "}";
+ + ", targets=" + targetNodeMap;
}
public long getTerm() {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
index fe28b9c142..c24e060580 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
@@ -33,6 +33,7 @@ public class ECReconstructionCoordinatorTask
LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class);
private final ECReconstructionCoordinator reconstructionCoordinator;
private final ECReconstructionCommandInfo reconstructionCommandInfo;
+ private final String debugString;
public ECReconstructionCoordinatorTask(
ECReconstructionCoordinator coordinator,
@@ -42,6 +43,7 @@ public class ECReconstructionCoordinatorTask
reconstructionCommandInfo.getTerm());
this.reconstructionCoordinator = coordinator;
this.reconstructionCommandInfo = reconstructionCommandInfo;
+ debugString = reconstructionCommandInfo.toString();
}
@Override
@@ -81,8 +83,8 @@ public class ECReconstructionCoordinatorTask
}
@Override
- public String toString() {
- return "ECReconstructionTask{info=" + reconstructionCommandInfo + '}';
+ protected Object getCommandForDebug() {
+ return debugString;
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java
index b5e11708ef..72fa88b35d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java
@@ -140,4 +140,23 @@ public abstract class AbstractReplicationTask {
boolean runOnInServiceOnly) {
this.shouldOnlyRunOnInServiceDatanodes = runOnInServiceOnly;
}
+
+ /**
+ * Hook for subclasses to provide info about the command.
+ * @return string representation of the command
+ */
+ protected Object getCommandForDebug() {
+ return "";
+ };
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder()
+ .append(getStatus()).append(" ")
+ .append(getCommandForDebug());
+ if (getStatus() == Status.QUEUED) {
+ sb.append(", queued at ").append(getQueued());
+ }
+ return sb.toString();
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
index a3f1cdee51..3f6fdc1ab5 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
@@ -105,7 +105,7 @@ public class GrpcContainerUploader implements
ContainerUploader {
@Override
public void onNext(SendContainerResponse sendContainerResponse) {
- LOG.info("Response for upload container {} to {}", containerId, target);
+ LOG.debug("Response for upload container {} to {}", containerId, target);
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
index 280e8cef75..7a4e59f7b6 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.container.replication;
+import com.google.common.base.Preconditions;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
@@ -25,6 +26,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Adapter from {@code OutputStream} to gRPC {@code StreamObserver}.
@@ -43,6 +45,8 @@ abstract class GrpcOutputStream<T> extends OutputStream {
private final int bufferSize;
+ private final AtomicBoolean closed = new AtomicBoolean();
+
private long writtenBytes;
GrpcOutputStream(StreamObserver<T> streamObserver,
@@ -55,6 +59,8 @@ abstract class GrpcOutputStream<T> extends OutputStream {
@Override
public void write(int b) {
+ Preconditions.checkState(!closed.get(), "stream is closed");
+
try {
buffer.write(b);
if (buffer.size() >= bufferSize) {
@@ -67,6 +73,8 @@ abstract class GrpcOutputStream<T> extends OutputStream {
@Override
public void write(@Nonnull byte[] data, int offset, int length) {
+ Preconditions.checkState(!closed.get(), "stream is closed");
+
if ((offset < 0) || (offset > data.length) || (length < 0) ||
((offset + length) > data.length) || ((offset + length) < 0)) {
throw new IndexOutOfBoundsException();
@@ -98,11 +106,13 @@ abstract class GrpcOutputStream<T> extends OutputStream {
@Override
public void close() throws IOException {
- flushBuffer(true);
- LOG.info("Sent {} bytes for container {}",
- writtenBytes, containerId);
- streamObserver.onCompleted();
- buffer.close();
+ if (!closed.getAndSet(true)) {
+ flushBuffer(true);
+ LOG.info("Sent {} bytes for container {}",
+ writtenBytes, containerId);
+ streamObserver.onCompleted();
+ buffer.close();
+ }
}
protected long getContainerId() {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index fa0b680970..cf161143b6 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -67,7 +67,7 @@ public class GrpcReplicationService extends
responseObserver, containerID, BUFFER_SIZE);
source.copyData(containerID, outputStream, compression);
} catch (IOException e) {
- LOG.error("Error streaming container {}", containerID, e);
+ LOG.warn("Error streaming container {}", containerID, e);
responseObserver.onError(e);
} finally {
// output may have already been closed, ignore such errors
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
index 54675cbbf3..0de728c875 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
@@ -66,6 +66,7 @@ public class PushReplicator implements ContainerReplicator {
uploader.startUpload(containerID, target, fut, compression));
source.copyData(containerID, output, compression);
fut.get();
+
task.setTransferredBytes(output.getByteCount());
task.setStatus(Status.DONE);
} catch (Exception e) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 6ad5c41d7c..cd78e758de 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.replication;
import java.time.Clock;
+import java.time.Instant;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
@@ -182,10 +183,11 @@ public class ReplicationSupervisor {
try {
requestCounter.incrementAndGet();
- if (task.getDeadline() > 0 && clock.millis() > task.getDeadline()) {
- LOG.info("Ignoring" +
- " {} since the current time {}ms is past the deadline {}ms",
- this, clock.millis(), task.getDeadline());
+ final long now = clock.millis();
+ final long deadline = task.getDeadline();
+ if (deadline > 0 && now > deadline) {
+ LOG.info("Ignoring {} since the deadline has passed ({} < {})",
+ this, Instant.ofEpochMilli(deadline), Instant.ofEpochMilli(now));
timeoutCounter.incrementAndGet();
return;
}
@@ -195,8 +197,8 @@ public class ReplicationSupervisor {
if (dn != null && dn.getPersistedOpState() !=
HddsProtos.NodeOperationalState.IN_SERVICE
&& task.shouldOnlyRunOnInServiceDatanodes()) {
- LOG.info("Dn is of {} state. Ignore {}",
- dn.getPersistedOpState(), this);
+ LOG.info("Ignoring {} since datanode is not in service ({})",
+ this, dn.getPersistedOpState());
return;
}
@@ -212,7 +214,7 @@ public class ReplicationSupervisor {
task.setStatus(Status.IN_PROGRESS);
task.runTask();
if (task.getStatus() == Status.FAILED) {
- LOG.error("Failed {}", this);
+ LOG.warn("Failed {}", this);
failureCounter.incrementAndGet();
} else if (task.getStatus() == Status.DONE) {
LOG.info("Successful {}", this);
@@ -223,7 +225,7 @@ public class ReplicationSupervisor {
}
} catch (Exception e) {
task.setStatus(Status.FAILED);
- LOG.error("Failed {}", this, e);
+ LOG.warn("Failed {}", this, e);
failureCounter.incrementAndGet();
} finally {
inFlight.remove(task);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
index a163b51f25..ca0ca98906 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
@@ -30,6 +30,7 @@ public class ReplicationTask extends AbstractReplicationTask {
private final ReplicateContainerCommand cmd;
private final ContainerReplicator replicator;
+ private final String debugString;
/**
* Counter for the transferred bytes.
@@ -49,6 +50,7 @@ public class ReplicationTask extends AbstractReplicationTask {
// run.
setShouldOnlyRunOnInServiceDatanodes(false);
}
+ debugString = cmd.toString();
}
/**
@@ -89,13 +91,18 @@ public class ReplicationTask extends
AbstractReplicationTask {
return cmd.getSourceDatanodes();
}
+ @Override
+ protected Object getCommandForDebug() {
+ return debugString;
+ }
+
@Override
public String toString() {
- return "ReplicationTask{" +
- "status=" + getStatus() +
- ", cmd={" + cmd + "}" +
- ", queued=" + getQueued() +
- '}';
+ String str = super.toString();
+ if (transferredBytes > 0) {
+ str += ", transferred " + transferredBytes + " bytes";
+ }
+ return str;
}
public long getTransferredBytes() {
@@ -110,10 +117,6 @@ public class ReplicationTask extends
AbstractReplicationTask {
return cmd.getTargetDatanode();
}
- ReplicateContainerCommand getCommand() {
- return cmd;
- }
-
@Override
public void runTask() {
replicator.replicate(this);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
index 02a30986c4..16a7d215b8 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -63,7 +63,7 @@ class SendContainerRequestHandler
public void onNext(SendContainerRequest req) {
try {
final long length = req.getData().size();
- LOG.info("Received part for container id:{} offset:{} len:{}",
+ LOG.debug("Received part for container id:{} offset:{} len:{}",
req.getContainerID(), req.getOffset(), length);
assertSame(nextOffset, req.getOffset(), "offset");
@@ -76,6 +76,8 @@ class SendContainerRequestHandler
path = dir.resolve(ContainerUtils.getContainerTarName(containerId));
output = Files.newOutputStream(path);
compression = CopyContainerCompression.fromProto(req.getCompression());
+
+ LOG.info("Accepting container {}", req.getContainerID());
}
assertSame(containerId, req.getContainerID(), "containerID");
@@ -90,7 +92,7 @@ class SendContainerRequestHandler
@Override
public void onError(Throwable t) {
- LOG.error("Error", t);
+ LOG.warn("Error receiving container {} at {}", containerId, nextOffset, t);
closeOutput();
deleteTarball();
responseObserver.onError(t);
@@ -103,23 +105,24 @@ class SendContainerRequestHandler
return;
}
- LOG.info("Received all parts for container {}", containerId);
+ LOG.info("Container {} is downloaded with size {}, starting to import.",
+ containerId, nextOffset);
closeOutput();
try {
importer.importContainer(containerId, path, volume, compression);
- LOG.info("Imported container {}", containerId);
+ LOG.info("Container {} is replicated successfully", containerId);
responseObserver.onNext(SendContainerResponse.newBuilder().build());
responseObserver.onCompleted();
} catch (Throwable t) {
- LOG.info("Failed to import container {}", containerId, t);
+ LOG.warn("Failed to import container {}", containerId, t);
deleteTarball();
responseObserver.onError(t);
}
}
private void closeOutput() {
- IOUtils.cleanupWithLogger(LOG, output);
+ IOUtils.close(LOG, output);
output = null;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
index aae40d87e9..5a8dc378cb 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
@@ -164,14 +164,14 @@ public final class ReplicateContainerCommand
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getType());
- sb.append(": containerId: ").append(getContainerID());
- sb.append(", replicaIndex: ").append(getReplicaIndex());
+ sb.append(": containerId=").append(getContainerID());
+ sb.append(", replicaIndex=").append(getReplicaIndex());
if (targetDatanode != null) {
- sb.append(", targetNode: ").append(targetDatanode);
+ sb.append(", targetNode=").append(targetDatanode);
} else {
- sb.append(", sourceNodes: ").append(sourceDatanodes);
+ sb.append(", sourceNodes=").append(sourceDatanodes);
}
- sb.append(", priority: ").append(priority);
+ sb.append(", priority=").append(priority);
return sb.toString();
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
index fe1b9f785b..57ea91bea0 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
@@ -35,6 +35,7 @@ import java.util.Random;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -158,6 +159,16 @@ abstract class GrpcOutputStreamTest<T> {
verifyResponses(concat(bytes1, bytes2));
}
+ @Test
+ void rejectsWriteAfterClose() throws IOException {
+ subject.close();
+
+ assertThrows(IllegalStateException.class, () -> subject.write(42));
+ assertThrows(IllegalStateException.class, () -> writeBytes(subject, 42));
+
+ subject.close(); // close is idempotent
+ }
+
private void verifyResponses(byte[] bytes) {
int expectedResponseCount = bytes.length / bufferSize;
if (bytes.length % bufferSize > 0) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
index 372d70c5a7..bbaa23ca8c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
@@ -110,8 +110,10 @@ public abstract class
UnhealthyReplicationProcessor<HealthResult extends
failedOnes.forEach(result ->
requeueHealthResultFromQueue(replicationManager, result));
- LOG.info("Processed {} containers with health state counts {}," +
- "failed processing {}", processed, healthStateCntMap, failed);
+ if (processed > 0 || failed > 0) {
+ LOG.info("Processed {} containers with health state counts {}, " +
+ "failed processing {}", processed, healthStateCntMap, failed);
+ }
}
/**
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index 0e8f746f6d..0ae5887094 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -79,8 +79,7 @@
OZONE-SITE.XML_ozone.httpfs.kerberos.keytab.file=/etc/security/keytabs/httpfs.ke
OZONE-SITE.XML_ozone.httpfs.kerberos.principal=httpfs/[email protected]
OZONE-SITE.XML_hdds.scm.replication.thread.interval=5s
-OZONE-SITE.XML_hdds.scm.replication.event.timeout=10s
-OZONE-SITE.XML_hdds.scm.replication.push=true
+OZONE-SITE.XML_hdds.scm.replication.enable.legacy=false
OZONE-SITE.XML_ozone.scm.stale.node.interval=30s
OZONE-SITE.XML_ozone.scm.dead.node.interval=45s
OZONE-SITE.XML_hdds.container.report.interval=60s
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]