This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 7bdacde4 Format code to pass check-style and make all tests can pass
(#309)
7bdacde4 is described below
commit 7bdacde4af7c84870e5961cc625159f07bb3396d
Author: rongtong <[email protected]>
AuthorDate: Sun Sep 11 11:22:03 2022 +0800
Format code to pass check-style and make all tests can pass (#309)
---
.github/workflows/maven.yml | 2 +-
.../connect/runtime/common/ConnectKeyValue.java | 9 +-
.../converter/record/json/JsonConverter.java | 33 ++----
.../runtime/serialization/JsonDeserializer.java | 7 +-
.../runtime/serialization/ListSerializer.java | 3 -
.../runtime/serialization/WrapperSerde.java | 17 +++
.../serialization/store/ConnectKeyValueSerde.java | 4 +-
.../store/ConnectKeyValueSerializer.java | 3 -
.../store/RecordOffsetDeserializer.java | 1 -
.../store/RecordPositionMapSerde.java | 6 +-
.../service/AbstractConfigManagementService.java | 7 +-
.../service/ConfigManagementServiceImpl.java | 126 ++++++++++-----------
.../service/PositionManagementServiceImpl.java | 29 ++---
.../service/StateManagementServiceImpl.java | 71 ++++++------
.../runtime/store/FileBaseKeyValueStore.java | 2 +-
.../runtime/utils/datasync/BrokerBasedLog.java | 38 ++++---
.../runtime/connectorwrapper/WorkerTest.java | 2 +-
17 files changed, 172 insertions(+), 188 deletions(-)
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 5672ed5d..46cb15d6 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -24,4 +24,4 @@ jobs:
distribution: 'temurin'
cache: maven
- name: Build with Maven
- run: mvn -B package --file pom.xml
\ No newline at end of file
+ run: mvn -B clean install
\ No newline at end of file
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
index 138495e4..f5612d37 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
@@ -137,7 +137,6 @@ public class ConnectKeyValue implements KeyValue,
Serializable, Cloneable {
this.properties = properties;
}
-
/**
* Gets all original settings with the given prefix.
*/
@@ -147,6 +146,7 @@ public class ConnectKeyValue implements KeyValue,
Serializable, Cloneable {
/**
* Gets all original settings with the given prefix.
+ *
* @param prefix the prefix to use as a filter
* @param strip strip the prefix before adding to the output if set true
* @return a Map containing the settings with the prefix
@@ -181,7 +181,7 @@ public class ConnectKeyValue implements KeyValue,
Serializable, Cloneable {
this.epoch = epoch;
}
- public ConnectKeyValue nextGeneration(){
+ public ConnectKeyValue nextGeneration() {
this.setEpoch(System.currentTimeMillis());
return this;
}
@@ -200,7 +200,6 @@ public class ConnectKeyValue implements KeyValue,
Serializable, Cloneable {
return properties.hashCode();
}
-
@Override
public Object clone() {
try {
@@ -210,7 +209,6 @@ public class ConnectKeyValue implements KeyValue,
Serializable, Cloneable {
}
}
-
@Override
public String toString() {
return "ConnectKeyValue{" +
@@ -218,7 +216,4 @@ public class ConnectKeyValue implements KeyValue,
Serializable, Cloneable {
'}';
}
-
-
-
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
index 7667bc8b..5094d405 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
@@ -53,14 +53,12 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-
/**
* json converter for fastjson
*/
public class JsonConverter implements RecordConverter {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
private static final Map<FieldType, JsonToConnectTypeConverter>
TO_CONNECT_CONVERTERS = new EnumMap<>(FieldType.class);
{
@@ -161,8 +159,8 @@ public class JsonConverter implements RecordConverter {
throw new ConnectException("Found invalid map
entry, expected length 2 but found :" + entryArray.toArray().length);
}
result.put(
- convertToConnect(keySchema,
entryArray.toArray()[0]),
- convertToConnect(valueSchema,
entryArray.toArray()[1]));
+ convertToConnect(keySchema,
entryArray.toArray()[0]),
+ convertToConnect(valueSchema,
entryArray.toArray()[1]));
}
}
return result;
@@ -281,7 +279,6 @@ public class JsonConverter implements RecordConverter {
});
}
-
private JsonDeserializer deserializer = new JsonDeserializer();
private JsonSerializer serializer = new JsonSerializer();
public JsonConverterConfig converterConfig;
@@ -303,9 +300,9 @@ public class JsonConverter implements RecordConverter {
/**
* Convert a rocketmq Connect data object to a native object for
serialization.
*
- * @param topic the topic associated with the data
+ * @param topic the topic associated with the data
* @param schema the schema for the value
- * @param value the value to convert
+ * @param value the value to convert
* @return the serialized value
*/
@Override
@@ -354,12 +351,11 @@ public class JsonConverter implements RecordConverter {
Object jsonSchema =
newJsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME);
Schema schema = asConnectSchema(jsonSchema == null ? null :
(JSONObject) jsonSchema);
return new SchemaAndValue(
- schema,
- convertToConnect(schema,
newJsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
+ schema,
+ convertToConnect(schema,
newJsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
);
}
-
/**
* convert to json with envelope
*
@@ -369,8 +365,8 @@ public class JsonConverter implements RecordConverter {
*/
private JSONObject convertToJsonWithEnvelope(Schema schema, Object value) {
return new JsonSchema.Envelope(
- asJsonSchema(schema),
- convertToJson(schema, value)
+ asJsonSchema(schema),
+ convertToJson(schema, value)
).toJsonNode();
}
@@ -385,7 +381,6 @@ public class JsonConverter implements RecordConverter {
return convertToJson(schema, value);
}
-
private interface JsonToConnectTypeConverter<Output> {
Output convert(Schema schema, Object value);
}
@@ -396,7 +391,6 @@ public class JsonConverter implements RecordConverter {
Object toConnect(Schema schema, Object value);
}
-
/**
* convert ConnectRecord schema to json schema
*
@@ -506,7 +500,6 @@ public class JsonConverter implements RecordConverter {
return jsonSchema;
}
-
/**
* Convert this object, in the org.apache.kafka.connect.data format, into
a JSON object, returning both the schema
* and the converted object.
@@ -591,22 +584,22 @@ public class JsonConverter implements RecordConverter {
}
JSONArray resultArray = new JSONArray();
- Map<String , Object> resultMap = new HashMap<>();
+ Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<?, ?> entry : map.entrySet()) {
Schema keySchema = schema == null ? null :
schema.getKeySchema();
Schema valueSchema = schema == null ? null :
schema.getValueSchema();
Object mapKey = convertToJson(keySchema,
entry.getKey());
Object mapValue = convertToJson(valueSchema,
entry.getValue());
- if (objectMode){
+ if (objectMode) {
resultMap.put((String) mapKey, mapValue);
- }else {
+ } else {
JSONArray entryArray = new JSONArray();
entryArray.add(0, mapKey);
entryArray.add(1, mapValue);
resultArray.add(entryArray);
}
}
- return objectMode? resultMap: resultArray;
+ return objectMode ? resultMap : resultArray;
}
case STRUCT: {
Struct struct = (Struct) value;
@@ -627,7 +620,6 @@ public class JsonConverter implements RecordConverter {
}
}
-
/**
* convert json to schema if not empty
*
@@ -713,7 +705,6 @@ public class JsonConverter implements RecordConverter {
throw new ConnectException("Unknown schema type: " +
schemaType);
}
-
// optional
Boolean isOptional =
jsonSchema.getBoolean(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME);
if (isOptional != null && isOptional) {
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/JsonDeserializer.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/JsonDeserializer.java
index 5fb0979b..cc46a494 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/JsonDeserializer.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/JsonDeserializer.java
@@ -27,8 +27,11 @@ import java.util.Objects;
*/
public class JsonDeserializer implements Deserializer<Object> {
private Class aClass;
- public JsonDeserializer(){}
- public JsonDeserializer(Class aClass){
+
+ public JsonDeserializer() {
+ }
+
+ public JsonDeserializer(Class aClass) {
this.aClass = aClass;
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java
index 5cf909ea..7aec653f 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java
@@ -18,13 +18,10 @@
package org.apache.rocketmq.connect.runtime.serialization;
import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import io.openmessaging.connector.api.data.Converter;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.UnsupportedEncodingException;
import java.util.List;
/**
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/WrapperSerde.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/WrapperSerde.java
index f7a2dfd6..8c7b0290 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/WrapperSerde.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/WrapperSerde.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.connect.runtime.serialization;
import java.util.Map;
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerde.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerde.java
index 8d75b5fd..ce86c2c5 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerde.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerde.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.rocketmq.connect.runtime.serialization.store;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
@@ -33,9 +32,10 @@ public class ConnectKeyValueSerde extends
WrapperSerde<ConnectKeyValue> {
/**
* serializer and deserializer
+ *
* @return
*/
- public static ConnectKeyValueSerde serde(){
+ public static ConnectKeyValueSerde serde() {
return new ConnectKeyValueSerde(new ConnectKeyValueSerializer(), new
ConnectKeyValueDeserializer());
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java
index 93fc717d..7042f302 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java
@@ -18,15 +18,12 @@
package org.apache.rocketmq.connect.runtime.serialization.store;
import com.alibaba.fastjson.JSON;
-import io.openmessaging.connector.api.data.Converter;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.UnsupportedEncodingException;
-
/**
* Converter data between ConnAndTaskConfigs and byte[].
*/
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java
index dad03345..ce09f518 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java
@@ -18,7 +18,6 @@
package org.apache.rocketmq.connect.runtime.serialization.store;
import com.alibaba.fastjson.JSON;
-import io.openmessaging.connector.api.data.Converter;
import io.openmessaging.connector.api.data.RecordOffset;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.serialization.Deserializer;
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerde.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerde.java
index c8585ed7..b239c2d5 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerde.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerde.java
@@ -29,15 +29,17 @@ import java.util.Map;
* Byte Map to byte[].
*/
public class RecordPositionMapSerde extends
WrapperSerde<Map<ExtendRecordPartition, RecordOffset>> {
- public RecordPositionMapSerde(Serializer<Map<ExtendRecordPartition,
RecordOffset>> serializer, Deserializer<Map<ExtendRecordPartition,
RecordOffset>> deserializer) {
+ public RecordPositionMapSerde(Serializer<Map<ExtendRecordPartition,
RecordOffset>> serializer,
+ Deserializer<Map<ExtendRecordPartition, RecordOffset>> deserializer) {
super(serializer, deserializer);
}
/**
* serializer and deserializer
+ *
* @return
*/
- public static RecordPositionMapSerde serde(){
+ public static RecordPositionMapSerde serde() {
return new RecordPositionMapSerde(new RecordPositionMapSerializer(),
new RecordPositionMapDeserializer());
}
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
index f2325899..cbdcc45a 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
@@ -59,7 +59,6 @@ public abstract class AbstractConfigManagementService
implements ConfigManagemen
*/
protected KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
-
@Override
public void recomputeTaskConfigs(String connectorName, ConnectKeyValue
configs) {
int maxTask = configs.getInt(ConnectorConfig.MAX_TASK,
ConnectorConfig.TASKS_MAX_DEFAULT);
@@ -86,11 +85,11 @@ public abstract class AbstractConfigManagementService
implements ConfigManagemen
newKeyValue.put(ConnectorConfig.TASK_CLASS,
connector.taskClass().getName());
// source topic
- if (configs.containsKey(SourceConnectorConfig.CONNECT_TOPICNAME)){
+ if (configs.containsKey(SourceConnectorConfig.CONNECT_TOPICNAME)) {
newKeyValue.put(SourceConnectorConfig.CONNECT_TOPICNAME,
configs.getString(SourceConnectorConfig.CONNECT_TOPICNAME));
}
// sink consume topic
- if (configs.containsKey(SinkConnectorConfig.CONNECT_TOPICNAMES)){
+ if (configs.containsKey(SinkConnectorConfig.CONNECT_TOPICNAMES)) {
newKeyValue.put(SinkConnectorConfig.CONNECT_TOPICNAMES,
configs.getString(SinkConnectorConfig.CONNECT_TOPICNAMES));
}
@@ -108,7 +107,6 @@ public abstract class AbstractConfigManagementService
implements ConfigManagemen
protected abstract void putTaskConfigs(String connectorName,
List<ConnectKeyValue> configs);
-
@NotNull
protected Connector loadConnector(ConnectKeyValue configs) {
String connectorClass =
configs.getString(ConnectorConfig.CONNECTOR_CLASS);
@@ -118,7 +116,6 @@ public abstract class AbstractConfigManagementService
implements ConfigManagemen
return connector;
}
-
@Override
public ClusterConfigState snapshot() {
if (taskKeyValueStore == null && connectorKeyValueStore == null) {
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 2e85482e..4be6669a 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -67,27 +67,29 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
public static final String START_SIGNAL = "start-signal";
public static final String TARGET_STATE_PREFIX = "target-state-";
+
public static String TARGET_STATE_KEY(String connectorName) {
return TARGET_STATE_PREFIX + connectorName;
}
-
public static final String CONNECTOR_PREFIX = "connector-";
+
public static String CONNECTOR_KEY(String connectorName) {
return CONNECTOR_PREFIX + connectorName;
}
public static final String TASK_PREFIX = "task-";
+
public static String TASK_KEY(ConnectorTaskId taskId) {
return TASK_PREFIX + taskId.connector() + "-" + taskId.task();
}
public static final String DELETE_CONNECTOR_PREFIX = "delete-";
+
public static String DELETE_CONNECTOR_KEY(String connectorName) {
return DELETE_CONNECTOR_PREFIX + connectorName;
}
-
private static final String FIELD_STATE = "state";
private static final String FIELD_EPOCH = "epoch";
private static final String FIELD_PROPS = "properties";
@@ -96,49 +98,48 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
* start signal
*/
public static final Schema START_SIGNAL_V0 = SchemaBuilder.struct()
- .field(START_SIGNAL, SchemaBuilder.string().build())
- .build();
+ .field(START_SIGNAL, SchemaBuilder.string().build())
+ .build();
/**
* connector configuration
*/
public static final Schema CONNECTOR_CONFIGURATION_V0 =
SchemaBuilder.struct()
- .field(FIELD_STATE, SchemaBuilder.string().build())
- .field(FIELD_EPOCH, SchemaBuilder.int64().build())
- .field(FIELD_PROPS,
- SchemaBuilder.map(
- SchemaBuilder.string().optional().build(),
- SchemaBuilder.string().optional().build()
- ).build())
- .build();
+ .field(FIELD_STATE, SchemaBuilder.string().build())
+ .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+ .field(FIELD_PROPS,
+ SchemaBuilder.map(
+ SchemaBuilder.string().optional().build(),
+ SchemaBuilder.string().optional().build()
+ ).build())
+ .build();
/**
* delete connector
*/
public static final Schema CONNECTOR_DELETE_CONFIGURATION_V0 =
SchemaBuilder.struct()
- .field(FIELD_EPOCH, SchemaBuilder.int64().build())
- .build();
+ .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+ .build();
/**
* task configuration
*/
- public static final Schema TASK_CONFIGURATION_V0 =SchemaBuilder.struct()
- .field(FIELD_EPOCH, SchemaBuilder.int64().build())
- .field(FIELD_PROPS,
- SchemaBuilder.map(
- SchemaBuilder.string().build(),
- SchemaBuilder.string().optional().build()
- ).build())
- .build();
+ public static final Schema TASK_CONFIGURATION_V0 = SchemaBuilder.struct()
+ .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+ .field(FIELD_PROPS,
+ SchemaBuilder.map(
+ SchemaBuilder.string().build(),
+ SchemaBuilder.string().optional().build()
+ ).build())
+ .build();
/**
* connector state
*/
public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
- .field(FIELD_STATE, SchemaBuilder.string().build())
- .field(FIELD_EPOCH, SchemaBuilder.int64().build())
- .build();
-
+ .field(FIELD_STATE, SchemaBuilder.string().build())
+ .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+ .build();
/**
* All listeners to trigger while config change.
@@ -157,7 +158,8 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
// converter
public RecordConverter converter;
- public ConfigManagementServiceImpl() {}
+ public ConfigManagementServiceImpl() {
+ }
@Override
public void initialize(WorkerConfig workerConfig, RecordConverter
converter, Plugin plugin) {
@@ -169,29 +171,28 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
this.connectorConfigUpdateListener = new HashSet<>();
this.dataSynchronizer = new BrokerBasedLog<>(workerConfig,
- this.topic,
- ConnectUtil.createGroupName(configManagePrefix,
workerConfig.getWorkerId()),
- new ConfigChangeCallback(),
- Serdes.serdeFrom(String.class),
- Serdes.serdeFrom(byte[].class)
+ this.topic,
+ ConnectUtil.createGroupName(configManagePrefix,
workerConfig.getWorkerId()),
+ new ConfigChangeCallback(),
+ Serdes.serdeFrom(String.class),
+ Serdes.serdeFrom(byte[].class)
);
// store connector config
this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
-
FilePathConfigUtil.getConnectorConfigPath(workerConfig.getStorePathRootDir()),
- new Serdes.StringSerde(),
- new JsonSerde(ConnectKeyValue.class));
+
FilePathConfigUtil.getConnectorConfigPath(workerConfig.getStorePathRootDir()),
+ new Serdes.StringSerde(),
+ new JsonSerde(ConnectKeyValue.class));
// store task config
this.taskKeyValueStore = new FileBaseKeyValueStore<>(
-
FilePathConfigUtil.getTaskConfigPath(workerConfig.getStorePathRootDir()),
- new Serdes.StringSerde(),
- new ListSerde(ConnectKeyValue.class));
+
FilePathConfigUtil.getTaskConfigPath(workerConfig.getStorePathRootDir()),
+ new Serdes.StringSerde(),
+ new ListSerde(ConnectKeyValue.class));
this.prepare(workerConfig);
}
-
/**
* Preparation before startup
*
@@ -216,7 +217,7 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
private void sendStartSignal() {
Struct struct = new Struct(START_SIGNAL_V0);
- struct.put(START_SIGNAL,"start");
+ struct.put(START_SIGNAL, "start");
dataSynchronizer.send(START_SIGNAL, converter.fromConnectData(topic,
START_SIGNAL_V0, struct));
}
@@ -233,7 +234,6 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
return connectorKeyValueStore.getKVMap();
}
-
@Override
public String putConnectorConfig(String connectorName, ConnectKeyValue
configs) {
// check request config
@@ -266,7 +266,7 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
connectConfig.put(FIELD_STATE, configs.getTargetState().name());
connectConfig.put(FIELD_EPOCH, configs.getEpoch());
- connectConfig.put(FIELD_PROPS , configs.getProperties());
+ connectConfig.put(FIELD_PROPS, configs.getProperties());
byte[] config = converter.fromConnectData(topic,
CONNECTOR_CONFIGURATION_V0, connectConfig);
dataSynchronizer.send(CONNECTOR_KEY(connectorName), config);
return connectorName;
@@ -377,38 +377,37 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
configs.setConnectorConfigs(connectorKeyValueStore.getKVMap());
connectorKeyValueStore.getKVMap().forEach((connectName,
connectKeyValue) -> {
Struct struct = new Struct(CONNECTOR_CONFIGURATION_V0)
- .put(FIELD_EPOCH, connectKeyValue.getEpoch())
- .put(FIELD_STATE, connectKeyValue.getTargetState().name())
- .put(FIELD_PROPS, connectKeyValue.getProperties());
+ .put(FIELD_EPOCH, connectKeyValue.getEpoch())
+ .put(FIELD_STATE, connectKeyValue.getTargetState().name())
+ .put(FIELD_PROPS, connectKeyValue.getProperties());
byte[] body = converter.fromConnectData(topic,
CONNECTOR_CONFIGURATION_V0, struct);
dataSynchronizer.send(CONNECTOR_KEY(connectName), body);
});
taskKeyValueStore.getKVMap().forEach((connectName, taskConfigs) -> {
- if (taskConfigs == null || taskConfigs.isEmpty()){
+ if (taskConfigs == null || taskConfigs.isEmpty()) {
return;
}
taskConfigs.forEach(taskConfig -> {
ConnectorTaskId taskId = new ConnectorTaskId(connectName,
taskConfig.getInt(ConnectorConfig.TASK_ID));
Struct struct = new Struct(TASK_CONFIGURATION_V0)
- .put(FIELD_EPOCH, System.currentTimeMillis())
- .put(FIELD_PROPS, taskConfig.getProperties());
+ .put(FIELD_EPOCH, System.currentTimeMillis())
+ .put(FIELD_PROPS, taskConfig.getProperties());
byte[] body = converter.fromConnectData(topic,
TASK_CONFIGURATION_V0, struct);
dataSynchronizer.send(TASK_KEY(taskId), body);
});
});
}
-
private class ConfigChangeCallback implements
DataSynchronizerCallback<String, byte[]> {
@Override
public void onCompletion(Throwable error, String key, byte[] value) {
- if (StringUtils.isEmpty(key)){
+ if (StringUtils.isEmpty(key)) {
log.error("Config change message is illegal, key is empty, the
message will be skipped");
return;
}
SchemaAndValue schemaAndValue = converter.toConnectData(topic,
value);
- if (key.equals(START_SIGNAL)){
+ if (key.equals(START_SIGNAL)) {
// send message in full
triggerSendMessage();
// reblance
@@ -455,7 +454,7 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
// validate
ConnectKeyValue oldConfig = connectorKeyValueStore.get(connectorName);
// config update
- if ((Long)epoch > oldConfig.getEpoch()) {
+ if ((Long) epoch > oldConfig.getEpoch()) {
// remove
connectorKeyValueStore.remove(connectorName);
taskKeyValueStore.remove(connectorName);
@@ -464,7 +463,6 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
}
}
-
/**
* process task config record
*
@@ -502,17 +500,17 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
Struct struct = (Struct) schemaAndValue.value();
Object targetState = struct.get(FIELD_STATE);
- if (!(targetState instanceof String)){
+ if (!(targetState instanceof String)) {
// target state
log.error("Invalid data for target state for connector '{}':
'state' field should be a String but is {}",
- connectorName, className(targetState));
+ connectorName, className(targetState));
return;
}
Object epoch = struct.get(FIELD_EPOCH);
if (!(epoch instanceof Long)) {
// epoch
log.error("Invalid data for epoch for connector '{}': 'epoch'
field should be a Long but is {}",
- connectorName, className(epoch));
+ connectorName, className(epoch));
return;
}
@@ -535,31 +533,31 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
* @return
*/
private boolean mergeConnectConfig(String connectName, SchemaAndValue
schemaAndValue) {
- Struct value = (Struct)schemaAndValue.value();
+ Struct value = (Struct) schemaAndValue.value();
Object targetState = value.get(FIELD_STATE);
- if (!(targetState instanceof String)){
+ if (!(targetState instanceof String)) {
// target state
log.error("Invalid data for target state for connector '{}':
'state' field should be a String but is {}",
- connectName, className(targetState));
+ connectName, className(targetState));
return false;
}
Object epoch = value.get(FIELD_EPOCH);
if (!(epoch instanceof Long)) {
// epoch
log.error("Invalid data for epoch for connector '{}': 'state'
field should be a long but is {}",
- connectName, className(epoch));
+ connectName, className(epoch));
return false;
}
Object props = value.get(FIELD_PROPS);
- if (!(props instanceof Map)){
+ if (!(props instanceof Map)) {
// properties
log.error("Invalid data for properties for connector '{}': 'state'
field should be a Map but is {}",
- connectName, className(props));
+ connectName, className(props));
return false;
}
// new configs
ConnectKeyValue newConfig = new ConnectKeyValue();
- newConfig.setEpoch((Long)epoch);
+ newConfig.setEpoch((Long) epoch);
newConfig.setTargetState(TargetState.valueOf((String) targetState));
newConfig.setProperties((Map<String, String>) props);
@@ -593,7 +591,6 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
return StagingMode.DISTRIBUTED;
}
-
private ConnectorTaskId parseTaskId(String key) {
String[] parts = key.split("-");
if (parts.length < 3) {
@@ -609,7 +606,6 @@ public class ConfigManagementServiceImpl extends
AbstractConfigManagementService
}
}
-
private String className(Object o) {
return o != null ? o.getClass().getName() : "null";
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 0b704dec..986bbc04 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -96,22 +96,22 @@ public class PositionManagementServiceImpl implements
PositionManagementService
this.keyConverter.configure(new HashMap<>());
this.valueConverter.configure(new HashMap<>());
this.dataSynchronizer = new BrokerBasedLog(
- workerConfig,
- this.topic,
- ConnectUtil.createGroupName(positionManagePrefix,
workerConfig.getWorkerId()),
- new PositionChangeCallback(),
- Serdes.serdeFrom(ByteBuffer.class),
- Serdes.serdeFrom(ByteBuffer.class)
+ workerConfig,
+ this.topic,
+ ConnectUtil.createGroupName(positionManagePrefix,
workerConfig.getWorkerId()),
+ new PositionChangeCallback(),
+ Serdes.serdeFrom(ByteBuffer.class),
+ Serdes.serdeFrom(ByteBuffer.class)
);
this.positionStore = new
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(workerConfig.getStorePathRootDir()),
- new RecordPartitionSerde(),
- new RecordOffsetSerde());
+ new RecordPartitionSerde(),
+ new RecordOffsetSerde());
this.positionUpdateListener = new HashSet<>();
this.needSyncPartition = new ConcurrentSet<>();
this.commitStarted = -1;
- this.config = workerConfig;
+ this.config = workerConfig;
this.prepare(workerConfig);
}
@@ -153,7 +153,6 @@ public class PositionManagementServiceImpl implements
PositionManagementService
positionStore.load();
}
-
@Override
public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
return positionStore.getKVMap();
@@ -196,9 +195,7 @@ public class PositionManagementServiceImpl implements
PositionManagementService
if (!increment) {
Set<ExtendRecordPartition> allPartitions = new HashSet<>();
allPartitions.addAll(positionStore.getKVMap().keySet());
- allPartitions.forEach((partition) -> {
- set(PositionChange.POSITION_CHANG_KEY, partition,
positionStore.get(partition));
- });
+ allPartitions.forEach(partition ->
set(PositionChange.POSITION_CHANG_KEY, partition,
positionStore.get(partition)));
}
//Incremental send
if (increment) {
@@ -207,9 +204,7 @@ public class PositionManagementServiceImpl implements
PositionManagementService
return;
}
Set<ExtendRecordPartition> partitionsTmp = new
HashSet<>(needSyncPartition);
- partitionsTmp.forEach((partition) -> {
- set(PositionChange.POSITION_CHANG_KEY, partition,
positionStore.get(partition));
- });
+ partitionsTmp.forEach(partition ->
set(PositionChange.POSITION_CHANG_KEY, partition,
positionStore.get(partition)));
}
// end send offset
if (increment) {
@@ -279,7 +274,6 @@ public class PositionManagementServiceImpl implements
PositionManagementService
dataSynchronizer.send(keyBuffer, valueBuffer);
}
-
private class PositionChangeCallback implements
DataSynchronizerCallback<ByteBuffer, ByteBuffer> {
@Override
@@ -331,7 +325,6 @@ public class PositionManagementServiceImpl implements
PositionManagementService
}
}
-
/**
* Merge new received position info with local store.
*
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
index 3914d327..651b2319 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
@@ -61,8 +61,6 @@ public class StateManagementServiceImpl implements
StateManagementService {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
-
private final String statusManagePrefix = "StatusManage";
public static final String START_SIGNAL = "start-signal";
@@ -74,18 +72,18 @@ public class StateManagementServiceImpl implements
StateManagementService {
public static final String WORKER_ID_KEY_NAME = "worker_id";
public static final String GENERATION_KEY_NAME = "generation";
private static final Schema STATUS_SCHEMA_V0 = SchemaBuilder.struct()
- .field(STATE_KEY_NAME, SchemaBuilder.string().build())
- .field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build())
- .field(WORKER_ID_KEY_NAME, SchemaBuilder.string().build())
- .field(GENERATION_KEY_NAME, SchemaBuilder.int64().build())
- .build();
+ .field(STATE_KEY_NAME, SchemaBuilder.string().build())
+ .field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build())
+ .field(WORKER_ID_KEY_NAME, SchemaBuilder.string().build())
+ .field(GENERATION_KEY_NAME, SchemaBuilder.int64().build())
+ .build();
/**
* start signal
*/
public static final Schema START_SIGNAL_V0 = SchemaBuilder.struct()
- .field(START_SIGNAL, SchemaBuilder.string().build())
- .build();
+ .field(START_SIGNAL, SchemaBuilder.string().build())
+ .build();
/**
* Synchronize config with other workers.
*/
@@ -100,6 +98,7 @@ public class StateManagementServiceImpl implements
StateManagementService {
private RecordConverter converter = new
org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter();
private String statusTopic;
+
/**
* Preparation before startup
*
@@ -127,23 +126,23 @@ public class StateManagementServiceImpl implements
StateManagementService {
this.statusTopic = config.getConnectStatusTopic();
this.dataSynchronizer = new BrokerBasedLog(config,
- this.statusTopic,
- ConnectUtil.createGroupName(statusManagePrefix,
config.getWorkerId()),
- new StatusChangeCallback(),
- Serdes.serdeFrom(String.class),
- Serdes.serdeFrom(byte[].class));
+ this.statusTopic,
+ ConnectUtil.createGroupName(statusManagePrefix,
config.getWorkerId()),
+ new StatusChangeCallback(),
+ Serdes.serdeFrom(String.class),
+ Serdes.serdeFrom(byte[].class));
/**connector status store*/
this.connectorStatusStore = new FileBaseKeyValueStore<>(
-
FilePathConfigUtil.getConnectorStatusConfigPath(config.getStorePathRootDir()),
- new Serdes.StringSerde(),
- new JsonSerde(ConnectorStatus.class));
+
FilePathConfigUtil.getConnectorStatusConfigPath(config.getStorePathRootDir()),
+ new Serdes.StringSerde(),
+ new JsonSerde(ConnectorStatus.class));
/**task status store*/
this.taskStatusStore = new FileBaseKeyValueStore<>(
-
FilePathConfigUtil.getTaskStatusConfigPath(config.getStorePathRootDir()),
- new Serdes.StringSerde(),
- new ListSerde(TaskStatus.class));
+
FilePathConfigUtil.getTaskStatusConfigPath(config.getStorePathRootDir()),
+ new Serdes.StringSerde(),
+ new ListSerde(TaskStatus.class));
// create topic
this.prepare(config);
}
@@ -161,7 +160,7 @@ public class StateManagementServiceImpl implements
StateManagementService {
private void startSignal() {
Struct struct = new Struct(START_SIGNAL_V0);
- struct.put(START_SIGNAL,START_SIGNAL);
+ struct.put(START_SIGNAL, START_SIGNAL);
dataSynchronizer.send(START_SIGNAL,
converter.fromConnectData(statusTopic, START_SIGNAL_V0, struct));
}
@@ -206,6 +205,7 @@ public class StateManagementServiceImpl implements
StateManagementService {
});
});
}
+
/**
* pre persist
*/
@@ -254,7 +254,6 @@ public class StateManagementServiceImpl implements
StateManagementService {
sendConnectorStatus(status, true);
}
-
/**
* Set the state of the connector to the given value.
*
@@ -266,10 +265,9 @@ public class StateManagementServiceImpl implements
StateManagementService {
}
/**
- * Safely set the state of the task to the given value. What is
- * considered "safe" depends on the implementation, but basically it
- * means that the store can provide higher assurance that another worker
- * hasn't concurrently written any conflicting data.
+ * Safely set the state of the task to the given value. What is considered
"safe" depends on the implementation, but
+ * basically it means that the store can provide higher assurance that
another worker hasn't concurrently written
+ * any conflicting data.
*
* @param status the status of the task
*/
@@ -294,11 +292,10 @@ public class StateManagementServiceImpl implements
StateManagementService {
send(key, status, entry, safeWrite);
}
-
private <V extends AbstractStatus<?>> void send(final String key,
- final V status,
- final
ConnAndTaskStatus.CacheEntry<V> entry,
- final boolean safeWrite) {
+ final V status,
+ final ConnAndTaskStatus.CacheEntry<V> entry,
+ final boolean safeWrite) {
synchronized (this) {
if (safeWrite && !entry.canWrite(status)) {
return;
@@ -326,7 +323,6 @@ public class StateManagementServiceImpl implements
StateManagementService {
return converter.fromConnectData(this.statusTopic, STATUS_SCHEMA_V0,
struct);
}
-
/**
* Get the current state of the task.
*
@@ -395,15 +391,14 @@ public class StateManagementServiceImpl implements
StateManagementService {
return StagingMode.DISTRIBUTED;
}
-
private class StatusChangeCallback implements
DataSynchronizerCallback<String, byte[]> {
@Override
public void onCompletion(Throwable error, String key, byte[] value) {
- if (StringUtils.isEmpty(key)){
+ if (StringUtils.isEmpty(key)) {
log.error("State change message is illegal, key is empty, the
message will be skipped ");
return;
}
- if (key.equals(START_SIGNAL)){
+ if (key.equals(START_SIGNAL)) {
replicaTargetState();
} else if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
readConnectorStatus(key, value);
@@ -437,7 +432,7 @@ public class StateManagementServiceImpl implements
StateManagementService {
log.trace("Received connector {} status update {}", connector,
status);
ConnAndTaskStatus.CacheEntry<ConnectorStatus> entry =
connAndTaskStatus.getOrAdd(connector);
if (entry.get() != null) {
- if (status.getGeneration() > entry.get().getGeneration() ){
+ if (status.getGeneration() > entry.get().getGeneration()) {
entry.put(status);
}
} else {
@@ -446,7 +441,6 @@ public class StateManagementServiceImpl implements
StateManagementService {
}
}
-
private String parseConnectorStatusKey(String key) {
return key.substring(CONNECTOR_STATUS_PREFIX.length());
}
@@ -463,7 +457,7 @@ public class StateManagementServiceImpl implements
StateManagementService {
String trace = (String) struct.get(TRACE_KEY_NAME);
String workerUrl = (String) struct.get(WORKER_ID_KEY_NAME);
Long generation = (Long) struct.get(GENERATION_KEY_NAME);
- return new ConnectorStatus(connector, state, workerUrl,
generation, trace);
+ return new ConnectorStatus(connector, state, workerUrl,
generation, trace);
} catch (Exception e) {
log.error("Failed to deserialize connector status", e);
return null;
@@ -493,7 +487,7 @@ public class StateManagementServiceImpl implements
StateManagementService {
log.trace("Received task {} status update {}", id, status);
ConnAndTaskStatus.CacheEntry<TaskStatus> entry =
connAndTaskStatus.getOrAdd(id);
if (entry.get() != null) {
- if (status.getGeneration() > entry.get().getGeneration() ){
+ if (status.getGeneration() > entry.get().getGeneration()) {
entry.put(status);
}
} else {
@@ -536,7 +530,6 @@ public class StateManagementServiceImpl implements
StateManagementService {
}
}
-
/**
* remove connector
*
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java
index e932860b..74937499 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java
@@ -54,7 +54,7 @@ public class FileBaseKeyValueStore<K, V> extends
MemoryBasedKeyValueStore<K, V>
public String encode() {
Map<String, String> map = new HashMap<>();
for (K key : data.keySet()) {
- byte[] keyByte = serdeKey.serializer().serialize("",key);
+ byte[] keyByte = serdeKey.serializer().serialize("", key);
byte[] valueByte = serdeValue.serializer().serialize("",
data.get(key));
map.put(Base64Util.base64Encode(keyByte),
Base64Util.base64Encode(valueByte));
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
index a5932a44..21f5f309 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
@@ -77,11 +77,11 @@ public class BrokerBasedLog<K, V> implements
DataSynchronizer<K, V> {
private Serde valueSerde;
public BrokerBasedLog(WorkerConfig connectConfig,
- String topicName,
- String workId,
- DataSynchronizerCallback<K, V>
dataSynchronizerCallback,
- Serde keySerde,
- Serde valueSerde) {
+ String topicName,
+ String workId,
+ DataSynchronizerCallback<K, V> dataSynchronizerCallback,
+ Serde keySerde,
+ Serde valueSerde) {
this.topicName = topicName;
this.keySerde = keySerde;
@@ -127,7 +127,7 @@ public class BrokerBasedLog<K, V> implements
DataSynchronizer<K, V> {
@Override
public void send(K key, V value) {
try {
- Map.Entry<byte[],byte[]> encode = encode(key, value);
+ Map.Entry<byte[], byte[]> encode = encode(key, value);
byte[] body = encode.getValue();
if (body.length > MAX_MESSAGE_SIZE) {
log.error("Message size is greater than {} bytes, key: {},
value {}", MAX_MESSAGE_SIZE, key, value);
@@ -161,7 +161,7 @@ public class BrokerBasedLog<K, V> implements
DataSynchronizer<K, V> {
@Override
public void send(K key, V value, Callback callback) {
try {
- Map.Entry<byte[],byte[]> encode = encode(key, value);
+ Map.Entry<byte[], byte[]> encode = encode(key, value);
byte[] body = encode.getValue();
if (body.length > MAX_MESSAGE_SIZE) {
log.error("Message size is greater than {} bytes, key: {},
value {}", MAX_MESSAGE_SIZE, key, value);
@@ -174,6 +174,7 @@ public class BrokerBasedLog<K, V> implements
DataSynchronizer<K, V> {
log.info("Send async message OK, msgId: {},topic:{}",
result.getMsgId(), topicName);
callback.onCompletion(null, value);
}
+
@Override public void onException(Throwable throwable) {
if (null != throwable) {
log.error("Send async message Failed, error: {}",
throwable);
@@ -186,19 +187,20 @@ public class BrokerBasedLog<K, V> implements
DataSynchronizer<K, V> {
}
}
-
- private Map.Entry<byte[],byte[]> encode(K key, V value) {
+ private Map.Entry<byte[], byte[]> encode(K key, V value) {
byte[] keySer = keySerde.serializer().serialize(topicName, key);
- byte[] valueSer = valueSerde.serializer().serialize(topicName,value);
- return new Map.Entry<byte[],byte[]>(){
+ byte[] valueSer = valueSerde.serializer().serialize(topicName, value);
+ return new Map.Entry<byte[], byte[]>() {
@Override
public byte[] getKey() {
return keySer;
}
+
@Override
public byte[] getValue() {
return valueSer;
}
+
@Override
public byte[] setValue(byte[] value) {
throw new UnsupportedOperationException();
@@ -207,17 +209,19 @@ public class BrokerBasedLog<K, V> implements
DataSynchronizer<K, V> {
}
private Map.Entry<K, V> decode(byte[] key, byte[] value) {
- K deKey = (K)keySerde.deserializer().deserialize(topicName, key);
- V deValue = (V)valueSerde.deserializer().deserialize(topicName, value);
- return new Map.Entry<K, V>(){
+ K deKey = (K) keySerde.deserializer().deserialize(topicName, key);
+ V deValue = (V) valueSerde.deserializer().deserialize(topicName,
value);
+ return new Map.Entry<K, V>() {
@Override
public K getKey() {
return deKey;
}
+
@Override
public V getValue() {
return deValue;
}
+
@Override
public V setValue(V value) {
throw new UnsupportedOperationException();
@@ -225,15 +229,15 @@ public class BrokerBasedLog<K, V> implements
DataSynchronizer<K, V> {
};
}
-
class MessageListenerImpl implements MessageListenerConcurrently {
@Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
rmqMsgList, ConsumeConcurrentlyContext context) {
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
rmqMsgList,
+ ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : rmqMsgList) {
log.info("Received one message: {}, topic is {}",
messageExt.getMsgId() + "\n", topicName);
try {
String key = messageExt.getKeys();
- Map.Entry<K,V> entry = decode(StringUtils.isEmpty(key) ?
null : Base64Util.base64Decode(key), messageExt.getBody());
+ Map.Entry<K, V> entry = decode(StringUtils.isEmpty(key) ?
null : Base64Util.base64Decode(key), messageExt.getBody());
dataSynchronizerCallback.onCompletion(null,
entry.getKey(), entry.getValue());
} catch (Exception e) {
log.error("Decode message data error. message: {}, error
info: {}", messageExt, e);
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 78e4f582..e017f565 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -129,7 +129,7 @@ public class WorkerTest {
connectConfig.setStorePathRootDir(System.getProperty("user.home") +
File.separator + "testConnectorStore");
connectConfig.setNamesrvAddr("localhost:9876");
stateManagementService = new StateManagementServiceImpl();
- stateManagementService.initialize(connectConfig);
+ stateManagementService.initialize(connectConfig, new TestConverter());
worker = new Worker(connectConfig, positionManagementService,
configManagementService, plugin, connectController, stateManagementService);
Set<WorkerConnector> workingConnectors = new HashSet<>();