This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8bb3d6e7af HDDS-7151. Avoid using GeneratedMessage in non-generated
code (#3699)
8bb3d6e7af is described below
commit 8bb3d6e7affb097c0d8b5b2faef8266bf7c21495
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Aug 23 11:02:50 2022 +0200
HDDS-7151. Avoid using GeneratedMessage in non-generated code (#3699)
---
.../container/common/report/ReportManager.java | 4 +-
.../container/common/report/ReportPublisher.java | 6 +--
.../common/report/ReportPublisherFactory.java | 6 +--
.../common/statemachine/StateContext.java | 56 +++++++++++-----------
.../states/endpoint/HeartbeatEndpointTask.java | 6 +--
.../protocol/commands/CommandForDatanode.java | 4 +-
.../hadoop/ozone/protocol/commands/SCMCommand.java | 4 +-
.../common/report/TestReportPublisher.java | 6 +--
.../common/statemachine/TestStateContext.java | 22 ++++-----
.../apache/hadoop/hdds/utils/db/CodecRegistry.java | 22 +++++----
.../replication/LegacyReplicationManager.java | 4 +-
.../apache/hadoop/hdds/scm/ha/StatefulService.java | 12 ++---
.../apache/hadoop/hdds/scm/ha/io/CodecFactory.java | 11 +++--
.../apache/hadoop/hdds/scm/ha/io/EnumCodec.java | 2 +-
.../hdds/scm/ha/io/GeneratedMessageCodec.java | 12 ++---
.../apache/hadoop/hdds/scm/ha/io/ListCodec.java | 2 +-
.../scm/server/SCMDatanodeHeartbeatDispatcher.java | 4 +-
17 files changed, 95 insertions(+), 88 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
index eb02f5154f..97d7fe2d93 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,7 +129,7 @@ public final class ReportManager {
*
* @return ReportManager.Builder
*/
- public Builder addPublisherFor(Class<? extends GeneratedMessage> report) {
+ public Builder addPublisherFor(Class<? extends Message> report) {
reportPublishers.add(publisherFactory.getPublisherFor(report));
return this;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
index 8d4820e60f..3d3c819c05 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
@@ -26,7 +26,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
* Abstract class responsible for scheduling the reports based on the
* configured interval. All the ReportPublishers should extend this class.
*/
-public abstract class ReportPublisher<T extends GeneratedMessage>
+public abstract class ReportPublisher<T extends Message>
implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(
@@ -81,7 +81,7 @@ public abstract class ReportPublisher<T extends
GeneratedMessage>
*/
private void publishReport() {
try {
- GeneratedMessage report = getReport();
+ Message report = getReport();
if (report instanceof CommandStatusReportsProto) {
context.addIncrementalReport(report);
} else {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
index 82a3c41b19..3be1b5e077 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
@@ -27,7 +27,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
/**
* Factory class to construct {@link ReportPublisher} for a report.
@@ -35,7 +35,7 @@ import com.google.protobuf.GeneratedMessage;
public class ReportPublisherFactory {
private final ConfigurationSource conf;
- private final Map<Class<? extends GeneratedMessage>,
+ private final Map<Class<? extends Message>,
Class<? extends ReportPublisher>> report2publisher;
/**
@@ -65,7 +65,7 @@ public class ReportPublisherFactory {
* @return report publisher
*/
public ReportPublisher getPublisherFor(
- Class<? extends GeneratedMessage> report) {
+ Class<? extends Message> report) {
Class<? extends ReportPublisher> publisherClass =
report2publisher.get(report);
if (publisherClass == null) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 087226c064..7a7f792613 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -62,7 +62,7 @@ import
org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus.Delete
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import com.google.common.base.Preconditions;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import static java.lang.Math.min;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval;
import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getReconHeartbeatInterval;
@@ -108,12 +108,12 @@ public class StateContext {
private final ConfigurationSource conf;
private final Set<InetSocketAddress> endpoints;
// Only the latest full report of each type is kept
- private final AtomicReference<GeneratedMessage> containerReports;
- private final AtomicReference<GeneratedMessage> nodeReport;
- private final AtomicReference<GeneratedMessage> pipelineReports;
- private final AtomicReference<GeneratedMessage> crlStatusReport;
+ private final AtomicReference<Message> containerReports;
+ private final AtomicReference<Message> nodeReport;
+ private final AtomicReference<Message> pipelineReports;
+ private final AtomicReference<Message> crlStatusReport;
// Incremental reports are queued in the map below
- private final Map<InetSocketAddress, List<GeneratedMessage>>
+ private final Map<InetSocketAddress, List<Message>>
incrementalReportsQueue;
private final Map<InetSocketAddress, Queue<ContainerAction>>
containerActions;
private final Map<InetSocketAddress, Queue<PipelineAction>> pipelineActions;
@@ -129,7 +129,7 @@ public class StateContext {
// List of supported full report types.
private final List<String> fullReportTypeList;
// ReportType -> Report.
- private final Map<String, AtomicReference<GeneratedMessage>> type2Reports;
+ private final Map<String, AtomicReference<Message>> type2Reports;
/**
* term of latest leader SCM, extract from SCMCommand.
@@ -283,7 +283,7 @@ public class StateContext {
*
* @param report report to be added
*/
- public void addIncrementalReport(GeneratedMessage report) {
+ public void addIncrementalReport(Message report) {
if (report == null) {
return;
}
@@ -306,7 +306,7 @@ public class StateContext {
*
* @param report report to be refreshed
*/
- public void refreshFullReport(GeneratedMessage report) {
+ public void refreshFullReport(Message report) {
if (report == null) {
return;
}
@@ -333,14 +333,14 @@ public class StateContext {
* @param reportsToPutBack list of reports which failed to be sent by
* heartbeat.
*/
- public void putBackReports(List<GeneratedMessage> reportsToPutBack,
+ public void putBackReports(List<Message> reportsToPutBack,
InetSocketAddress endpoint) {
if (LOG.isDebugEnabled()) {
LOG.debug("endpoint: {}, size of reportsToPutBack: {}",
endpoint, reportsToPutBack.size());
}
// We don't expect too much reports to be put back
- for (GeneratedMessage report : reportsToPutBack) {
+ for (Message report : reportsToPutBack) {
final Descriptor descriptor = report.getDescriptorForType();
Preconditions.checkState(descriptor != null);
final String reportType = descriptor.getFullName();
@@ -359,7 +359,7 @@ public class StateContext {
*
* @return List of reports
*/
- public List<GeneratedMessage> getAllAvailableReports(
+ public List<Message> getAllAvailableReports(
InetSocketAddress endpoint
) {
int maxLimit = Integer.MAX_VALUE;
@@ -385,7 +385,7 @@ public class StateContext {
synchronized (parentDatanodeStateMachine
.getContainer()) {
synchronized (incrementalReportsQueue) {
- for (Map.Entry<InetSocketAddress, List<GeneratedMessage>>
+ for (Map.Entry<InetSocketAddress, List<Message>>
entry : incrementalReportsQueue.entrySet()) {
if (entry.getValue() != null) {
entry.getValue().removeIf(
@@ -403,24 +403,24 @@ public class StateContext {
}
@VisibleForTesting
- List<GeneratedMessage> getAllAvailableReportsUpToLimit(
+ List<Message> getAllAvailableReportsUpToLimit(
InetSocketAddress endpoint,
int limit) {
- List<GeneratedMessage> reports = getFullReports(endpoint, limit);
- List<GeneratedMessage> incrementalReports = getIncrementalReports(endpoint,
+ List<Message> reports = getFullReports(endpoint, limit);
+ List<Message> incrementalReports = getIncrementalReports(endpoint,
limit - reports.size()); // get all (MAX_VALUE)
reports.addAll(incrementalReports);
return reports;
}
- List<GeneratedMessage> getIncrementalReports(
+ List<Message> getIncrementalReports(
InetSocketAddress endpoint, int maxLimit) {
- List<GeneratedMessage> reportsToReturn = new LinkedList<>();
+ List<Message> reportsToReturn = new LinkedList<>();
synchronized (incrementalReportsQueue) {
- List<GeneratedMessage> reportsForEndpoint =
+ List<Message> reportsForEndpoint =
incrementalReportsQueue.get(endpoint);
if (reportsForEndpoint != null) {
- List<GeneratedMessage> tempList = reportsForEndpoint.subList(
+ List<Message> tempList = reportsForEndpoint.subList(
0, min(reportsForEndpoint.size(), maxLimit));
reportsToReturn.addAll(tempList);
tempList.clear();
@@ -429,11 +429,11 @@ public class StateContext {
return reportsToReturn;
}
- List<GeneratedMessage> getFullReports(
+ List<Message> getFullReports(
InetSocketAddress endpoint, int maxLimit) {
int count = 0;
Map<String, AtomicBoolean> mp = isFullReportReadyToBeSent.get(endpoint);
- List<GeneratedMessage> fullReports = new LinkedList<>();
+ List<Message> fullReports = new LinkedList<>();
if (null != mp) {
for (Map.Entry<String, AtomicBoolean> kv : mp.entrySet()) {
if (count == maxLimit) {
@@ -441,13 +441,13 @@ public class StateContext {
}
if (kv.getValue().get()) {
String reportType = kv.getKey();
- final AtomicReference<GeneratedMessage> ref =
+ final AtomicReference<Message> ref =
type2Reports.get(reportType);
if (ref == null) {
throw new RuntimeException(reportType + " is not a valid full "
+ "report type!");
}
- final GeneratedMessage msg = ref.get();
+ final Message msg = ref.get();
if (msg != null) {
fullReports.add(msg);
// Mark the report as not ready to be sent, until another refresh.
@@ -890,22 +890,22 @@ public class StateContext {
}
@VisibleForTesting
- public GeneratedMessage getContainerReports() {
+ public Message getContainerReports() {
return containerReports.get();
}
@VisibleForTesting
- public GeneratedMessage getNodeReport() {
+ public Message getNodeReport() {
return nodeReport.get();
}
@VisibleForTesting
- public GeneratedMessage getPipelineReports() {
+ public Message getPipelineReports() {
return pipelineReports.get();
}
@VisibleForTesting
- public GeneratedMessage getCRLStatusReport() {
+ public Message getCRLStatusReport() {
return crlStatusReport.get();
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 45838b5fd8..ccb0e8b7d7 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -20,7 +20,7 @@ package
org.apache.hadoop.ozone.container.common.states.endpoint;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
@@ -200,7 +200,7 @@ public class HeartbeatEndpointTask
// TODO: Make it generic.
private void putBackIncrementalReports(
SCMHeartbeatRequestProto.Builder requestBuilder) {
- List<GeneratedMessage> reports = new LinkedList<>();
+ List<Message> reports = new LinkedList<>();
// We only put back CommandStatusReports and IncrementalContainerReport
// because those are incremental. Container/Node/PipelineReport are
// accumulative so we can keep only the latest of each.
@@ -219,7 +219,7 @@ public class HeartbeatEndpointTask
* @param requestBuilder builder to which the report has to be added.
*/
private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
- for (GeneratedMessage report :
+ for (Message report :
context.getAllAvailableReports(rpcEndpoint.getAddress())) {
String reportName = report.getDescriptorForType().getFullName();
for (Descriptors.FieldDescriptor descriptor :
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
index 893aa897d1..101e70d6fc 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
@@ -19,13 +19,13 @@ package org.apache.hadoop.ozone.protocol.commands;
import java.util.UUID;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
/**
* Command for the datanode with the destination address.
*/
-public class CommandForDatanode<T extends GeneratedMessage> implements
+public class CommandForDatanode<T extends Message> implements
IdentifiableEventPayload {
private final UUID datanodeId;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 85f6475f8d..744118e301 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.ozone.protocol.commands;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import org.apache.hadoop.hdds.HddsIdFactory;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
@@ -28,7 +28,7 @@ import
org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
* commands in protobuf format.
* @param <T>
*/
-public abstract class SCMCommand<T extends GeneratedMessage> implements
+public abstract class SCMCommand<T extends Message> implements
IdentifiableEventPayload {
private final long id;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index f2770d2941..7068da443b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.container.common.report;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Descriptors;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import java.io.IOException;
import java.util.ArrayList;
@@ -85,7 +85,7 @@ public class TestReportPublisher {
}
@Override
- protected GeneratedMessage getReport() {
+ protected Message getReport() {
getReportCount++;
return null;
}
@@ -195,7 +195,7 @@ public class TestReportPublisher {
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Unit test ReportManager Thread - %d").build());
publisher.init(dummyContext, executorService);
- GeneratedMessage report =
+ Message report =
((CRLStatusReportPublisher) publisher).getReport();
Assert.assertNotNull(report);
for (Descriptors.FieldDescriptor descriptor :
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index f592be9797..225ee6cc5f 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -66,7 +66,7 @@ import org.apache.ozone.test.LambdaTestUtils;
import org.junit.Assert;
import org.junit.Test;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
/**
* Test class for Datanode StateContext.
@@ -247,10 +247,10 @@ public class TestStateContext {
}
}
- void checkReportCount(List<GeneratedMessage> reports,
+ void checkReportCount(List<Message> reports,
Map<String, Integer> expectedReportCount) {
Map<String, Integer> reportCount = new HashMap<>();
- for (GeneratedMessage report : reports) {
+ for (Message report : reports) {
final String reportName = report.getDescriptorForType().getFullName();
reportCount.put(reportName, reportCount.getOrDefault(reportName, 0) + 1);
}
@@ -272,7 +272,7 @@ public class TestStateContext {
assertNull(context1.getContainerReports());
assertNull(context1.getNodeReport());
assertNull(context1.getPipelineReports());
- GeneratedMessage containerReports =
+ Message containerReports =
newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME);
context1.refreshFullReport(containerReports);
@@ -284,7 +284,7 @@ public class TestStateContext {
// NodeReport
StateContext context2 = newStateContext(conf, datanodeStateMachineMock);
- GeneratedMessage nodeReport =
+ Message nodeReport =
newMockReport(StateContext.NODE_REPORT_PROTO_NAME);
context2.refreshFullReport(nodeReport);
@@ -296,7 +296,7 @@ public class TestStateContext {
// PipelineReports
StateContext context3 = newStateContext(conf, datanodeStateMachineMock);
- GeneratedMessage pipelineReports =
+ Message pipelineReports =
newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME);
context3.refreshFullReport(pipelineReports);
@@ -318,8 +318,8 @@ public class TestStateContext {
return stateContext;
}
- private GeneratedMessage newMockReport(String messageType) {
- GeneratedMessage report = mock(GeneratedMessage.class);
+ private Message newMockReport(String messageType) {
+ Message report = mock(Message.class);
if (StateContext
.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME.equals(messageType)) {
report =
@@ -343,7 +343,7 @@ public class TestStateContext {
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
- GeneratedMessage generatedMessage =
+ Message generatedMessage =
newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
// Try to add report with zero endpoint. Should not be stored.
@@ -356,7 +356,7 @@ public class TestStateContext {
// Add report. Should be added to all endpoints.
stateContext.addIncrementalReport(generatedMessage);
- List<GeneratedMessage> allAvailableReports =
+ List<Message> allAvailableReports =
stateContext.getAllAvailableReports(scm1);
assertEquals(1, allAvailableReports.size());
assertEquals(1, stateContext.getAllAvailableReports(scm2).size());
@@ -652,4 +652,4 @@ public class TestStateContext {
summary.get(SCMCommandProto.Type.closeContainerCommand).intValue());
}
-}
\ No newline at end of file
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecRegistry.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecRegistry.java
index b358ae484d..2d5da45e8f 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecRegistry.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecRegistry.java
@@ -19,11 +19,15 @@
package org.apache.hadoop.hdds.utils.db;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.List;
import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.ClassUtils;
+
/**
* Collection of available codecs.
*/
@@ -105,15 +109,17 @@ public class CodecRegistry {
*/
private <T> Codec getCodec(Class<T> format) throws IOException {
Codec<T> codec;
- if (valueCodecs.containsKey(format)) {
- codec = (Codec<T>) valueCodecs.get(format);
- } else if (valueCodecs.containsKey(format.getSuperclass())) {
- codec = (Codec<T>) valueCodecs.get(format.getSuperclass());
- } else {
- throw new IllegalStateException(
- "Codec is not registered for type: " + format);
+ final List<Class<?>> classes = new ArrayList<>();
+ classes.add(format);
+ classes.addAll(ClassUtils.getAllSuperclasses(format));
+ classes.addAll(ClassUtils.getAllInterfaces(format));
+ for (Class<?> clazz : classes) {
+ if (valueCodecs.containsKey(clazz)) {
+ return (Codec<T>) valueCodecs.get(clazz);
+ }
}
- return codec;
+ throw new IllegalStateException(
+ "Codec is not registered for type: " + format);
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index af4f7430ab..27c7a12e66 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hdds.scm.container.replication;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
@@ -1610,7 +1610,7 @@ public class LegacyReplicationManager {
* @param tracker Tracker which tracks the inflight actions
* @param <T> Type of SCMCommand
*/
- private <T extends GeneratedMessage> boolean sendAndTrackDatanodeCommand(
+ private <T extends Message> boolean sendAndTrackDatanodeCommand(
final DatanodeDetails datanode,
final SCMCommand<T> command,
final Predicate<InflightAction> tracker) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
index 3c73d11050..ef496d9964 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/StatefulService.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hdds.scm.ha;
import com.google.protobuf.ByteString;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@@ -42,13 +42,13 @@ public abstract class StatefulService implements SCMService
{
}
/**
- * Persists the specified {@link GeneratedMessage} configurationMessage
+ * Persists the specified {@link Message} configurationMessage
* to RocksDB with this service's {@link SCMService#getServiceName()} as the
* key.
- * @param configurationMessage configuration GeneratedMessage to persist
+ * @param configurationMessage configuration Message to persist
* @throws IOException on failure to persist configuration
*/
- protected final void saveConfiguration(GeneratedMessage configurationMessage)
+ protected final void saveConfiguration(Message configurationMessage)
throws IOException, TimeoutException {
stateManager.saveConfiguration(getServiceName(),
configurationMessage.toByteString());
@@ -64,7 +64,7 @@ public abstract class StatefulService implements SCMService {
* @throws IOException on failure to fetch the message from DB or when
* parsing it. ensure the specified configType is correct
*/
- protected final <T extends GeneratedMessage> T readConfiguration(
+ protected final <T extends Message> T readConfiguration(
Class<T> configType) throws IOException {
ByteString byteString = stateManager.readConfiguration(getServiceName());
if (byteString == null) {
@@ -77,7 +77,7 @@ public abstract class StatefulService implements SCMService {
} catch (NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
e.printStackTrace();
- throw new IOException("GeneratedMessage cannot be parsed. Ensure that "
+ throw new IOException("Message cannot be parsed. Ensure that "
+ configType + " is the correct expected message type for " +
this.getServiceName(), e);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
index 18b147207a..6c75593be1 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
@@ -18,14 +18,15 @@
package org.apache.hadoop.hdds.scm.ha.io;
import com.google.protobuf.ByteString;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolMessageEnum;
+import org.apache.commons.lang3.ClassUtils;
+
import java.math.BigInteger;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,7 +39,7 @@ public final class CodecFactory {
private static Map<Class<?>, Codec> codecs = new HashMap<>();
static {
- codecs.put(GeneratedMessage.class, new GeneratedMessageCodec());
+ codecs.put(Message.class, new GeneratedMessageCodec());
codecs.put(ProtocolMessageEnum.class, new EnumCodec());
codecs.put(List.class, new ListCodec());
codecs.put(Integer.class, new IntegerCodec());
@@ -56,8 +57,8 @@ public final class CodecFactory {
throws InvalidProtocolBufferException {
final List<Class<?>> classes = new ArrayList<>();
classes.add(type);
- classes.add(type.getSuperclass());
- classes.addAll(Arrays.asList(type.getInterfaces()));
+ classes.addAll(ClassUtils.getAllSuperclasses(type));
+ classes.addAll(ClassUtils.getAllInterfaces(type));
for (Class<?> clazz : classes) {
if (codecs.containsKey(clazz)) {
return codecs.get(clazz);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/EnumCodec.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/EnumCodec.java
index 6f1f2f18ed..9c184ca31d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/EnumCodec.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/EnumCodec.java
@@ -47,7 +47,7 @@ public class EnumCodec implements Codec {
} catch (NoSuchMethodException | IllegalAccessException
| InvocationTargetException ex) {
throw new InvalidProtocolBufferException(
- "GeneratedMessage cannot be decoded!" + ex.getMessage());
+ "Message cannot be decoded!" + ex.getMessage());
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/GeneratedMessageCodec.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/GeneratedMessageCodec.java
index 45ab5df7d4..50e44a4ca6 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/GeneratedMessageCodec.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/GeneratedMessageCodec.java
@@ -18,34 +18,34 @@
package org.apache.hadoop.hdds.scm.ha.io;
import com.google.protobuf.ByteString;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.scm.ha.ReflectionUtil;
import java.lang.reflect.InvocationTargetException;
/**
- * {@link Codec} for {@link GeneratedMessage} objects.
+ * {@link Codec} for {@link Message} objects.
*/
public class GeneratedMessageCodec implements Codec {
@Override
public ByteString serialize(Object object) {
- return ((GeneratedMessage)object).toByteString();
+ return ((Message)object).toByteString();
}
@Override
- public GeneratedMessage deserialize(Class<?> type, ByteString value)
+ public Message deserialize(Class<?> type, ByteString value)
throws InvalidProtocolBufferException {
try {
- return (GeneratedMessage) ReflectionUtil.getMethod(type,
+ return (Message) ReflectionUtil.getMethod(type,
"parseFrom", byte[].class)
.invoke(null, (Object) value.toByteArray());
} catch (NoSuchMethodException | IllegalAccessException
| InvocationTargetException ex) {
ex.printStackTrace();
throw new InvalidProtocolBufferException(
- "GeneratedMessage cannot be decoded: " + ex.getMessage());
+ "Message cannot be decoded: " + ex.getMessage());
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ListCodec.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ListCodec.java
index 2560733315..0667b8776f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ListCodec.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ListCodec.java
@@ -66,7 +66,7 @@ public class ListCodec implements Codec {
IllegalAccessException | InvocationTargetException |
ClassNotFoundException ex) {
throw new InvalidProtocolBufferException(
- "GeneratedMessage cannot be decoded: " + ex.getMessage());
+ "Message cannot be decoded: " + ex.getMessage());
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 407e5afe6d..16b5ef8fba 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -210,7 +210,7 @@ public final class SCMDatanodeHeartbeatDispatcher {
/**
* Wrapper class for events with the datanode origin.
*/
- public static class ReportFromDatanode<T extends GeneratedMessage> {
+ public static class ReportFromDatanode<T extends Message> {
private final DatanodeDetails datanodeDetails;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]