This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 7a12824e fix compile error
7a12824e is described below
commit 7a12824e56817e99c4a5f0272c007c30e06d61ff
Author: yuntian.zb <[email protected]>
AuthorDate: Mon Aug 29 20:46:15 2022 +0800
fix compile error
---
.../runtime/config/SourceConnectorConfig.java | 18 ++
.../rocketmq/connect/runtime/rest/RestHandler.java | 4 +-
.../WorkerSinkTaskContextTest.java | 6 +-
.../DistributedConnectControllerTest.java | 5 +-
.../distributed/TestConfigManagementService.java | 18 +-
.../controller/isolation/TestFileSystem.java | 4 +-
.../StandaloneConnectControllerTest.java | 6 +-
.../errors/DeadLetterQueueReporterTest.java | 7 +-
.../runtime/errors/ReporterManagerUtilTest.java | 4 +-
.../store/PositionStorageReaderImplTest.java | 6 +-
.../runtime/store/PositionStorageWriterTest.java | 6 +-
.../rocketmq/connect/transforms/ChangeCase.java | 159 ++++++-------
.../connect/transforms/ChangeCaseConfig.java | 55 ++---
.../connect/transforms/ExtractNestedField.java | 148 ++++++------
.../transforms/ExtractNestedFieldConfig.java | 64 +++---
.../rocketmq/connect/transforms/PatternFilter.java | 162 +++++++-------
.../connect/transforms/PatternFilterConfig.java | 55 +++--
.../rocketmq/connect/transforms/PatternRename.java | 198 +++++++++--------
.../connect/transforms/PatternRenameConfig.java | 79 ++++---
.../rocketmq/connect/transforms/RegexRouter.java | 7 +-
.../connect/transforms/SetMaximumPrecision.java | 247 +++++++++++----------
.../transforms/SetMaximumPrecisionConfig.java | 18 +-
.../rocketmq/connect/transforms/SetNull.java | 71 +++---
.../connect/transforms/util/ExtendKeyValue.java | 29 ++-
.../connect/transforms/util/SchemaHelper.java | 2 +-
.../connect/transforms/util/SchemaUtil.java | 53 +++--
26 files changed, 754 insertions(+), 677 deletions(-)
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
index 7fc12ec7..12012917 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
package org.apache.rocketmq.connect.runtime.config;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index 6067bb53..9fc7e68e 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -44,8 +44,8 @@ import java.util.Set;
public class RestHandler {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
- private final String CONNECTOR_NAME = "connectorName";
- private final String TASK_NAME = "task";
+ private static final String CONNECTOR_NAME = "connectorName";
+ private static final String TASK_NAME = "task";
private final AbstractConnectController connectController;
/** connector plugin resource */
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
index f0dc3f27..060a81d7 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.assertj.core.api.Assertions;
@@ -42,7 +43,8 @@ public class WorkerSinkTaskContextTest {
@Mock
private WorkerSinkTask workerSinkTask;
- private DefaultMQPullConsumer defaultMQPullConsumer = new
DefaultMQPullConsumer();
+ private DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
+
private RecordPartition recordPartition;
@@ -58,7 +60,7 @@ public class WorkerSinkTaskContextTest {
Map<String, String> offset = new HashMap<>();
offset.put("queueOffset", "0");
recordOffset = new RecordOffset(offset);
- workerSinkTaskContext = new WorkerSinkTaskContext(connectKeyValue,
workerSinkTask, defaultMQPullConsumer);
+ workerSinkTaskContext = new WorkerSinkTaskContext(connectKeyValue,
workerSinkTask, consumer);
}
@Test
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
index 0ca330a6..c39fca54 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
@@ -28,6 +28,7 @@ import
org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
import
org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementService;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
@@ -58,6 +59,8 @@ public class DistributedConnectControllerTest {
private ServerResponseMocker brokerMocker;
+ private StateManagementService stateManagementService;
+
@Before
public void before() throws InterruptedException {
nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
@@ -65,7 +68,7 @@ public class DistributedConnectControllerTest {
connectConfig.setNamesrvAddr("127.0.0.1:9876");
clusterManagementService.initialize(connectConfig);
distributedConnectController = new
DistributedConnectController(plugin, distributedConfig,
clusterManagementService,
- configManagementService, positionManagementService);
+ configManagementService, positionManagementService,
stateManagementService);
}
@After
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java
index b9c77cee..198c99a1 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java
@@ -25,6 +25,7 @@ import
org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.StagingMode;
+import org.apache.rocketmq.connect.runtime.store.ClusterConfigState;
public class TestConfigManagementService implements ConfigManagementService {
@Override public void start() {
@@ -39,15 +40,20 @@ public class TestConfigManagementService implements
ConfigManagementService {
return null;
}
- @Override public Map<String, ConnectKeyValue>
getConnectorConfigsIncludeDeleted() {
+
+ @Override public String putConnectorConfig(String connectorName,
ConnectKeyValue configs) {
return null;
}
- @Override public String putConnectorConfig(String connectorName,
ConnectKeyValue configs) throws Exception {
- return null;
+ @Override public void deleteConnectorConfig(String connectorName) {
+
}
- @Override public void removeConnectorConfig(String connectorName) {
+ @Override public void pauseConnector(String connectorName) {
+
+ }
+
+ @Override public void resumeConnector(String connectorName) {
}
@@ -72,6 +78,10 @@ public class TestConfigManagementService implements
ConfigManagementService {
}
+ @Override public ClusterConfigState snapshot() {
+ return null;
+ }
+
@Override public Plugin getPlugin() {
return null;
}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
index 30e673aa..88eaa028 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
@@ -27,11 +27,11 @@ import java.nio.file.attribute.UserPrincipalLookupService;
import java.nio.file.spi.FileSystemProvider;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
-import sun.nio.fs.MacOSXFileSystemProvider;
public class TestFileSystem extends FileSystem {
@Override public FileSystemProvider provider() {
- return new MacOSXFileSystemProvider();
+// return new MacOSXFileSystemProvider();
+ return null;
}
@Override public void close() throws IOException {
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
index 7652f3f2..f74116e0 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
@@ -27,6 +27,7 @@ import
org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
import
org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementService;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
@@ -51,6 +52,9 @@ public class StandaloneConnectControllerTest {
private PositionManagementService positionManagementService = new
TestPositionManageServiceImpl();
+ @Mock
+ private StateManagementService stateManagementService;
+
@Before
public void before() {
NameServerMocker.startByDefaultConf(9876, 10911);
@@ -59,7 +63,7 @@ public class StandaloneConnectControllerTest {
standaloneConfig.setHttpPort(10001);
clusterManagementService.initialize(standaloneConfig);
standaloneConnectController = new StandaloneConnectController(plugin,
standaloneConfig, clusterManagementService,
- configManagementService, positionManagementService);
+ configManagementService, positionManagementService,
stateManagementService);
}
@After
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
index e9c8e48a..ac072c95 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
@@ -23,7 +23,8 @@ import java.util.Map;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
import
org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
import org.assertj.core.api.Assertions;
@@ -84,9 +85,9 @@ public class DeadLetterQueueReporterTest {
private DeadLetterQueueReporter buildDeadLetterQueueReporter() {
ConnectKeyValue sinkConfig = new ConnectKeyValue();
Map<String, String> properties = new HashMap<>();
- properties.put(DeadLetterQueueConfig.DLQ_TOPIC_NAME_CONFIG,
"DEAD_LETTER_TOPIC");
+ properties.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG,
"DEAD_LETTER_TOPIC");
sinkConfig.setProperties(properties);
- ConnectConfig workerConfig = new ConnectConfig();
+ WorkerConfig workerConfig = new WorkerConfig();
final DeadLetterQueueReporter deadLetterQueueReporter =
DeadLetterQueueReporter.build("fileSinkConnector", sinkConfig, workerConfig);
return deadLetterQueueReporter;
}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtilTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtilTest.java
index db0c3137..91da86d1 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtilTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtilTest.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.connect.runtime.errors;
import io.openmessaging.connector.api.data.RecordConverter;
import java.util.List;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.converter.record.StringConverter;
import org.junit.Assert;
import org.junit.Test;
@@ -48,7 +48,7 @@ public class ReporterManagerUtilTest {
@Test
public void sinkTaskReportersTest() {
- ConnectConfig workerConfig = new ConnectConfig();
+ WorkerConfig workerConfig = new WorkerConfig();
final List<ErrorReporter> connector =
ReporterManagerUtil.sinkTaskReporters("testConnector", connectKeyValue,
workerConfig);
Assert.assertEquals(1, connector.size());
}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImplTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImplTest.java
index 91ff23bd..15f0d010 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImplTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImplTest.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
import
org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
@@ -49,7 +49,7 @@ public class PositionStorageReaderImplTest {
private RecordOffset recordOffset;
- private ConnectConfig connectConfig;
+ private WorkerConfig connectConfig;
private ServerResponseMocker nameServerMocker;
@@ -60,7 +60,7 @@ public class PositionStorageReaderImplTest {
nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
brokerMocker = ServerResponseMocker.startServer(10911, "Hello
World".getBytes(StandardCharsets.UTF_8));
- connectConfig = new ConnectConfig();
+ connectConfig = new WorkerConfig();
connectConfig.setNamesrvAddr("localhost:9876");
positionManagementService = new PositionManagementServiceImpl();
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriterTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriterTest.java
index 381177de..b12625df 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriterTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriterTest.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
import
org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
@@ -40,7 +40,7 @@ public class PositionStorageWriterTest {
private PositionManagementService positionManagementService;
- private ConnectConfig connectConfig;
+ private WorkerConfig connectConfig;
private ServerResponseMocker nameServerMocker;
@@ -65,7 +65,7 @@ public class PositionStorageWriterTest {
offset.put("queueOffset", 0L);
recordOffset = new RecordOffset(offset);
- connectConfig = new ConnectConfig();
+ connectConfig = new WorkerConfig();
connectConfig.setNamesrvAddr("localhost:9876");
positionManagementService = new PositionManagementServiceImpl();
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
index b8094d26..0e62498c 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
@@ -33,97 +33,98 @@ import java.util.Map;
/**
* change case
+ *
* @param <R>
*/
public abstract class ChangeCase<R extends ConnectRecord> extends
BaseTransformation<R> {
- private static final Logger log = LoggerFactory.getLogger(ChangeCase.class);
-
- class State {
- public final Map<String, String> columnMapping;
- public final Schema schema;
- State(Map<String, String> columnMapping, Schema schema) {
- this.columnMapping = columnMapping;
- this.schema = schema;
- }
- }
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ChangeCase.class);
- Map<Schema, State> schemaState = new HashMap<>();
- private ChangeCaseConfig config;
- @Override
- public void start(KeyValue config) {
- this.config = new ChangeCaseConfig(config);
- }
+ class State {
+ public final Map<String, String> columnMapping;
+ public final Schema schema;
- @Override
- protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct
input) {
- // state
- final State state = this.schemaState.computeIfAbsent(inputSchema, schema
-> {
- final SchemaBuilder builder = SchemaBuilder.struct();
- if (!Strings.isNullOrEmpty(schema.getName())) {
- builder.name(schema.getName());
- }
- if (schema.isOptional()) {
- builder.optional();
- }
- final Map<String, String> columnMapping = new LinkedHashMap<>();
- for (Field field : schema.getFields()) {
- final String newFieldName = this.config.from.to(this.config.to,
field.getName());
- log.trace("processStruct() - Mapped '{}' to '{}'", field.getName(),
newFieldName);
- columnMapping.put(field.getName(), newFieldName);
- builder.field(newFieldName, field.getSchema());
- }
- return new State(columnMapping, builder.build());
- });
+ State(Map<String, String> columnMapping, Schema schema) {
+ this.columnMapping = columnMapping;
+ this.schema = schema;
+ }
+ }
+ Map<Schema, State> schemaState = new HashMap<>();
+ private ChangeCaseConfig config;
- final Struct outputStruct = new Struct(state.schema);
- for (Map.Entry<String, String> kvp : state.columnMapping.entrySet()) {
- final Object value = input.get(kvp.getKey());
- outputStruct.put(kvp.getValue(), value);
+ @Override
+ public void start(KeyValue config) {
+ this.config = new ChangeCaseConfig(config);
}
- return new SchemaAndValue(state.schema, outputStruct);
- }
- /**
- * transform key
- */
- public static class Key extends ChangeCase<ConnectRecord> {
@Override
- public ConnectRecord doTransform(ConnectRecord r) {
- final SchemaAndValue transformed = process(r, r.getKeySchema(),
r.getKey());
- ConnectRecord record = new ConnectRecord(
- r.getPosition().getPartition(),
- r.getPosition().getOffset(),
- r.getTimestamp(),
- transformed.schema(),
- transformed.value(),
- r.getSchema(),
- r.getData()
- );
- record.setExtensions(r.getExtensions());
- return record;
+ protected SchemaAndValue processStruct(R record, Schema inputSchema,
Struct input) {
+ // state
+ final State state = this.schemaState.computeIfAbsent(inputSchema,
schema -> {
+ final SchemaBuilder builder = SchemaBuilder.struct();
+ if (!Strings.isNullOrEmpty(schema.getName())) {
+ builder.name(schema.getName());
+ }
+ if (schema.isOptional()) {
+ builder.optional();
+ }
+ final Map<String, String> columnMapping = new LinkedHashMap<>();
+ for (Field field : schema.getFields()) {
+ final String newFieldName =
this.config.from.to(this.config.to, field.getName());
+ LOGGER.trace("processStruct() - Mapped '{}' to '{}'",
field.getName(), newFieldName);
+ columnMapping.put(field.getName(), newFieldName);
+ builder.field(newFieldName, field.getSchema());
+ }
+ return new State(columnMapping, builder.build());
+ });
+
+ final Struct outputStruct = new Struct(state.schema);
+ for (Map.Entry<String, String> kvp : state.columnMapping.entrySet()) {
+ final Object value = input.get(kvp.getKey());
+ outputStruct.put(kvp.getValue(), value);
+ }
+ return new SchemaAndValue(state.schema, outputStruct);
}
- }
+ /**
+ * transform key
+ */
+ public static class Key extends ChangeCase<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getKeySchema(),
r.getKey());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ transformed.schema(),
+ transformed.value(),
+ r.getSchema(),
+ r.getData()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
+ }
- /**
- * transform value
- */
- public static class Value extends ChangeCase<ConnectRecord> {
- @Override
- public ConnectRecord doTransform(ConnectRecord r) {
- final SchemaAndValue transformed = process(r, r.getSchema(),
r.getData());
- ConnectRecord record = new ConnectRecord(
- r.getPosition().getPartition(),
- r.getPosition().getOffset(),
- r.getTimestamp(),
- r.getKeySchema(),
- r.getKey(),
- transformed.schema(),
- transformed.value()
- );
- record.setExtensions(r.getExtensions());
- return record;
+ /**
+ * transform value
+ */
+ public static class Value extends ChangeCase<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getSchema(),
r.getData());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ r.getKeySchema(),
+ r.getKey(),
+ transformed.schema(),
+ transformed.value()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
}
- }
}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
index fa9a5c99..805310b1 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
@@ -16,39 +16,40 @@
*/
package org.apache.rocketmq.connect.transforms;
-
import com.google.common.base.CaseFormat;
import io.openmessaging.KeyValue;
public class ChangeCaseConfig {
- public final CaseFormat from;
- public final CaseFormat to;
+ public final CaseFormat from;
+ public final CaseFormat to;
- public static final String FROM_CONFIG = "from";
- static final String FROM_DOC = "The format to move from ";
- public static final String TO_CONFIG = "to";
- static final String TO_DOC = "";
+ public static final String FROM_CONFIG = "from";
+ static final String FROM_DOC = "The format to move from ";
+ public static final String TO_CONFIG = "to";
+ static final String TO_DOC = "";
- public ChangeCaseConfig(KeyValue config) {
- String fromConfig = config.getString(FROM_CONFIG);
- this.from = CaseFormat.valueOf(fromConfig);
- String toConfig = config.getString(TO_CONFIG);
- this.to = CaseFormat.valueOf(toConfig);
- }
+ public ChangeCaseConfig(KeyValue config) {
+ String fromConfig = config.getString(FROM_CONFIG);
+ this.from = CaseFormat.valueOf(fromConfig);
+ String toConfig = config.getString(TO_CONFIG);
+ this.to = CaseFormat.valueOf(toConfig);
+ }
- /**
- * from
- * @return
- */
- public CaseFormat from(){
- return this.from;
- }
+ /**
+ * from
+ *
+ * @return
+ */
+ public CaseFormat from() {
+ return this.from;
+ }
- /**
- * to
- * @return
- */
- public CaseFormat to(){
- return this.to;
- }
+ /**
+ * to
+ *
+ * @return
+ */
+ public CaseFormat to() {
+ return this.to;
+ }
}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
index 122951d0..fc8bcfa8 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
@@ -32,91 +32,91 @@ import java.util.Map;
/**
* extract nested field
+ *
* @param <R>
*/
public abstract class ExtractNestedField<R extends ConnectRecord> extends
BaseTransformation<R> {
- private static final Logger log =
LoggerFactory.getLogger(ExtractNestedField.class);
- private ExtractNestedFieldConfig config;
- Map<Schema, Schema> schemaCache;
- @Override
- public void start(KeyValue keyValue) {
- this.config = new ExtractNestedFieldConfig(keyValue);
- this.schemaCache = new HashMap<>();
- }
-
-
- @Override
- protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct
input) {
- final Struct innerStruct = input.getStruct(this.config.outerFieldName);
- final Schema outputSchema = this.schemaCache.computeIfAbsent(inputSchema,
s -> {
+ private static final Logger log =
LoggerFactory.getLogger(ExtractNestedField.class);
+ private ExtractNestedFieldConfig config;
+ Map<Schema, Schema> schemaCache;
- final Field innerField =
innerStruct.schema().getField(this.config.innerFieldName);
- final SchemaBuilder builder = SchemaBuilder.struct();
- if (!Strings.isNullOrEmpty(inputSchema.getName())) {
- builder.name(inputSchema.getName());
- }
- if (inputSchema.isOptional()) {
- builder.optional();
- }
- for (Field inputField : inputSchema.getFields()) {
- builder.field(inputField.getName(), inputField.getSchema());
- }
- builder.field(this.config.innerFieldName, innerField.getSchema());
- return builder.build();
- });
- final Struct outputStruct = new Struct(outputSchema);
- for (Field inputField : inputSchema.getFields()) {
- final Object value = input.get(inputField);
- outputStruct.put(inputField.getName(), value);
+ @Override
+ public void start(KeyValue keyValue) {
+ this.config = new ExtractNestedFieldConfig(keyValue);
+ this.schemaCache = new HashMap<>();
}
- final Object innerFieldValue = innerStruct.get(this.config.innerFieldName);
- outputStruct.put(this.config.innerFieldName, innerFieldValue);
- return new SchemaAndValue(outputSchema, outputStruct);
+ @Override
+ protected SchemaAndValue processStruct(R record, Schema inputSchema,
Struct input) {
+ final Struct innerStruct = input.getStruct(this.config.outerFieldName);
+ final Schema outputSchema =
this.schemaCache.computeIfAbsent(inputSchema, s -> {
- }
+ final Field innerField =
innerStruct.schema().getField(this.config.innerFieldName);
+ final SchemaBuilder builder = SchemaBuilder.struct();
+ if (!Strings.isNullOrEmpty(inputSchema.getName())) {
+ builder.name(inputSchema.getName());
+ }
+ if (inputSchema.isOptional()) {
+ builder.optional();
+ }
+ for (Field inputField : inputSchema.getFields()) {
+ builder.field(inputField.getName(), inputField.getSchema());
+ }
+ builder.field(this.config.innerFieldName, innerField.getSchema());
+ return builder.build();
+ });
+ final Struct outputStruct = new Struct(outputSchema);
+ for (Field inputField : inputSchema.getFields()) {
+ final Object value = input.get(inputField);
+ outputStruct.put(inputField.getName(), value);
+ }
+ final Object innerFieldValue =
innerStruct.get(this.config.innerFieldName);
+ outputStruct.put(this.config.innerFieldName, innerFieldValue);
+
+ return new SchemaAndValue(outputSchema, outputStruct);
- /**
- * transform key
- */
- public static class Key extends ExtractNestedField<ConnectRecord> {
- @Override
- public ConnectRecord doTransform(ConnectRecord r) {
- final SchemaAndValue transformed = process(r, r.getKeySchema(),
r.getKey());
- ConnectRecord record = new ConnectRecord(
- r.getPosition().getPartition(),
- r.getPosition().getOffset(),
- r.getTimestamp(),
- transformed.schema(),
- transformed.value(),
- r.getSchema(),
- r.getData()
- );
- record.setExtensions(r.getExtensions());
- return record;
}
- }
+ /**
+ * transform key
+ */
+ public static class Key extends ExtractNestedField<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getKeySchema(),
r.getKey());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ transformed.schema(),
+ transformed.value(),
+ r.getSchema(),
+ r.getData()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
+ }
- /**
- * transform value
- */
- public static class Value extends ExtractNestedField<ConnectRecord> {
- @Override
- public ConnectRecord doTransform(ConnectRecord r) {
- final SchemaAndValue transformed = process(r, r.getSchema(),
r.getData());
- ConnectRecord record = new ConnectRecord(
- r.getPosition().getPartition(),
- r.getPosition().getOffset(),
- r.getTimestamp(),
- r.getKeySchema(),
- r.getKey(),
- transformed.schema(),
- transformed.value()
- );
- record.setExtensions(r.getExtensions());
- return record;
+ /**
+ * transform value
+ */
+ public static class Value extends ExtractNestedField<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getSchema(),
r.getData());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ r.getKeySchema(),
+ r.getKey(),
+ transformed.schema(),
+ transformed.value()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
}
- }
}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
index 9cd2f233..1c689a2a 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
@@ -18,41 +18,39 @@ package org.apache.rocketmq.connect.transforms;
import io.openmessaging.KeyValue;
-
/**
* extract nested field config
*/
-public class ExtractNestedFieldConfig{
- public final String outerFieldName;
- public final String innerFieldName;
- public final String outputFieldName;
-
- public ExtractNestedFieldConfig(KeyValue config) {
- this.outerFieldName = config.getString(OUTER_FIELD_NAME_CONF);
- this.innerFieldName = config.getString(INNER_FIELD_NAME_CONF);
- this.outputFieldName = config.getString(OUTPUT_FIELD_NAME_CONF);
- }
-
- public static final String OUTER_FIELD_NAME_CONF = "input.outer.field.name";
- static final String OUTER_FIELD_NAME_DOC = "The field on the parent struct
containing the child struct. " +
- "For example if you wanted the extract `address.state` you would use
`address`.";
- public static final String INNER_FIELD_NAME_CONF = "input.inner.field.name";
- static final String INNER_FIELD_NAME_DOC = "The field on the child struct
containing the field to be extracted. " +
- "For example if you wanted the extract `address.state` you would use
`state`.";
- public static final String OUTPUT_FIELD_NAME_CONF = "output.field.name";
- static final String OUTPUT_FIELD_NAME_DOC = "The field to place the
extracted value into.";
-
-
- public String outerFieldName(){
- return this.outerFieldName;
- }
-
- public String innerFieldName(){
- return this.innerFieldName;
- }
-
- public String outputFieldName(){
- return this.outerFieldName;
- }
+public class ExtractNestedFieldConfig {
+ public final String outerFieldName;
+ public final String innerFieldName;
+ public final String outputFieldName;
+
+ public ExtractNestedFieldConfig(KeyValue config) {
+ this.outerFieldName = config.getString(OUTER_FIELD_NAME_CONF);
+ this.innerFieldName = config.getString(INNER_FIELD_NAME_CONF);
+ this.outputFieldName = config.getString(OUTPUT_FIELD_NAME_CONF);
+ }
+
+ public static final String OUTER_FIELD_NAME_CONF =
"input.outer.field.name";
+ static final String OUTER_FIELD_NAME_DOC = "The field on the parent struct
containing the child struct. " +
+ "For example if you wanted the extract `address.state` you would use
`address`.";
+ public static final String INNER_FIELD_NAME_CONF =
"input.inner.field.name";
+ static final String INNER_FIELD_NAME_DOC = "The field on the child struct
containing the field to be extracted. " +
+ "For example if you wanted the extract `address.state` you would use
`state`.";
+ public static final String OUTPUT_FIELD_NAME_CONF = "output.field.name";
+ static final String OUTPUT_FIELD_NAME_DOC = "The field to place the
extracted value into.";
+
+ public String outerFieldName() {
+ return this.outerFieldName;
+ }
+
+ public String innerFieldName() {
+ return this.innerFieldName;
+ }
+
+ public String outputFieldName() {
+ return this.outerFieldName;
+ }
}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
index e2a71fff..c219535d 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
@@ -31,102 +31,104 @@ import java.util.regex.Pattern;
/**
* pattern filter
+ *
* @param <R>
*/
public abstract class PatternFilter<R extends ConnectRecord> extends
BaseTransformation<R> {
- private static final Logger log =
LoggerFactory.getLogger(PatternFilter.class);
- public Pattern pattern;
- public Set<String> fields;
+ private static final Logger log =
LoggerFactory.getLogger(PatternFilter.class);
+ public Pattern pattern;
+ public Set<String> fields;
- private PatternFilterConfig config;
- @Override
- public void start(KeyValue config) {
- this.config = new PatternFilterConfig(config);
- this.pattern = this.config.pattern();
- this.fields = this.config.fields();
- }
+ private PatternFilterConfig config;
+ @Override
+ public void start(KeyValue config) {
+ this.config = new PatternFilterConfig(config);
+ this.pattern = this.config.pattern();
+ this.fields = this.config.fields();
+ }
- R filter(R record, Struct struct) {
- for (Field field : struct.schema().getFields()) {
- if (!this.fields.contains(field.getName()) ||
field.getSchema().getFieldType() != FieldType.STRING) {
- continue;
- }
- String input = struct.getString(field.getName());
- if (null != input) {
- if (this.pattern.matcher(input).matches()) {
- return null;
+ R filter(R record, Struct struct) {
+ for (Field field : struct.schema().getFields()) {
+ if (!this.fields.contains(field.getName()) ||
field.getSchema().getFieldType() != FieldType.STRING) {
+ continue;
+ }
+ String input = struct.getString(field.getName());
+ if (null != input) {
+ if (this.pattern.matcher(input).matches()) {
+ return null;
+ }
+ }
}
- }
+ return record;
}
- return record;
- }
- /**
- * filter map
- * @param record
- * @param map
- * @return
- */
- R filter(R record, Map map) {
- for (Object field : map.keySet()) {
- if (!this.fields.contains(field)) {
- continue;
- }
- Object value = map.get(field);
- if (value instanceof String) {
- String input = (String) value;
- if (this.pattern.matcher(input).matches()) {
- return null;
+ /**
+ * filter map
+ *
+ * @param record
+ * @param map
+ * @return
+ */
+ R filter(R record, Map map) {
+ for (Object field : map.keySet()) {
+ if (!this.fields.contains(field)) {
+ continue;
+ }
+ Object value = map.get(field);
+ if (value instanceof String) {
+ String input = (String) value;
+ if (this.pattern.matcher(input).matches()) {
+ return null;
+ }
+ }
}
- }
+ return record;
}
- return record;
- }
-
- R filter(R record, final boolean key) {
- final SchemaAndValue input = key ?
- new SchemaAndValue(record.getKeySchema(), record.getKey()) :
- new SchemaAndValue(record.getSchema(), record.getData());
- final R result;
- if (input.schema() != null) {
- if (FieldType.STRUCT == input.schema().getFieldType()) {
- result = filter(record, (Struct) input.value());
- } else if (FieldType.MAP == input.schema().getFieldType()) {
- result = filter(record, (Map) input.value());
- } else {
- result = record;
- }
- } else if (input.value() instanceof Map) {
- result = filter(record, (Map) input.value());
- } else {
- result = record;
+ R filter(R record, final boolean key) {
+ final SchemaAndValue input = key ?
+ new SchemaAndValue(record.getKeySchema(), record.getKey()) :
+ new SchemaAndValue(record.getSchema(), record.getData());
+ final R result;
+ if (input.schema() != null) {
+ if (FieldType.STRUCT == input.schema().getFieldType()) {
+ result = filter(record, (Struct) input.value());
+ } else if (FieldType.MAP == input.schema().getFieldType()) {
+ result = filter(record, (Map) input.value());
+ } else {
+ result = record;
+ }
+ } else if (input.value() instanceof Map) {
+ result = filter(record, (Map) input.value());
+ } else {
+ result = record;
+ }
+ return result;
}
- return result;
- }
-
- /**
- * filter key
- * @param <R>
- */
- public static class Key<R extends ConnectRecord> extends PatternFilter<R> {
- @Override
- public R doTransform(R r) {
- return filter(r, true);
+ /**
+ * filter key
+ *
+ * @param <R>
+ */
+ public static class Key<R extends ConnectRecord> extends PatternFilter<R> {
+ @Override
+ public R doTransform(R r) {
+ return filter(r, true);
+ }
}
- }
- /**
- * filter value
- * @param <R>
- */
- public static class Value<R extends ConnectRecord> extends PatternFilter<R> {
- @Override
- public R doTransform(R r) {
- return filter(r, false);
+ /**
+ * filter value
+ *
+ * @param <R>
+ */
+ public static class Value<R extends ConnectRecord> extends
PatternFilter<R> {
+ @Override
+ public R doTransform(R r) {
+ return filter(r, false);
+ }
}
- }
}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
index 543ddf7d..00367b0c 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
@@ -26,38 +26,37 @@ import java.util.Set;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
-
/**
* pattern filter config
*/
-public class PatternFilterConfig{
-
- public static final String PATTERN_CONFIG = "pattern";
- public static final String PATTERN_DOC = "The regex to test the message
with. ";
-
- public static final String FIELD_CONFIG = "fields";
- public static final String FIELD_DOC = "The fields to transform.";
-
-
- private final Pattern pattern;
- private final Set<String> fields;
- public PatternFilterConfig(KeyValue config){
- ExtendKeyValue extendKeyValue = new ExtendKeyValue(config);
- String pattern = extendKeyValue.getString(PATTERN_CONFIG);
- try {
- this.pattern = Pattern.compile(pattern);
- } catch (PatternSyntaxException var4) {
- throw new ConnectException(String.format("Could not compile regex
'%s'.", pattern));
+public class PatternFilterConfig {
+
+ public static final String PATTERN_CONFIG = "pattern";
+ public static final String PATTERN_DOC = "The regex to test the message
with. ";
+
+ public static final String FIELD_CONFIG = "fields";
+ public static final String FIELD_DOC = "The fields to transform.";
+
+ private final Pattern pattern;
+ private final Set<String> fields;
+
+ public PatternFilterConfig(KeyValue config) {
+ ExtendKeyValue extendKeyValue = new ExtendKeyValue(config);
+ String pattern = extendKeyValue.getString(PATTERN_CONFIG);
+ try {
+ this.pattern = Pattern.compile(pattern);
+ } catch (PatternSyntaxException var4) {
+ throw new ConnectException(String.format("Could not compile regex
'%s'.", pattern));
+ }
+ List<String> fields = extendKeyValue.getList(FIELD_CONFIG);
+ this.fields = new HashSet<>(fields);
}
- List<String> fields = extendKeyValue.getList(FIELD_CONFIG);
- this.fields = new HashSet<>(fields);
- }
- public Pattern pattern() {
- return this.pattern;
- }
+ public Pattern pattern() {
+ return this.pattern;
+ }
- public Set<String> fields() {
- return this.fields;
- }
+ public Set<String> fields() {
+ return this.fields;
+ }
}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
index d27e3300..a36cb314 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
@@ -33,114 +33,116 @@ import java.util.regex.Matcher;
/**
* pattern rename
+ *
* @param <R>
*/
public abstract class PatternRename<R extends ConnectRecord> extends
BaseTransformation<R> {
- private static final Logger log =
LoggerFactory.getLogger(PatternRename.class);
- PatternRenameConfig config;
- @Override
- public void start(KeyValue keyValue) {
- config = new PatternRenameConfig(keyValue);
- }
+ private static final Logger log =
LoggerFactory.getLogger(PatternRename.class);
+ PatternRenameConfig config;
- @Override
- protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct
inputStruct) {
- final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct();
- outputSchemaBuilder.name(inputSchema.getName());
- outputSchemaBuilder.doc(inputSchema.getDoc());
- if (null != inputSchema.getDefaultValue()) {
- outputSchemaBuilder.defaultValue(inputSchema.getDefaultValue());
- }
- if (null != inputSchema.getParameters() &&
!inputSchema.getParameters().isEmpty()) {
- outputSchemaBuilder.parameters(inputSchema.getParameters());
- }
- if (inputSchema.isOptional()) {
- outputSchemaBuilder.optional();
- }
- Map<String, String> fieldMappings = new
HashMap<>(inputSchema.getFields().size());
- for (final Field inputField : inputSchema.getFields()) {
- log.trace("process() - Processing field '{}'", inputField.getName());
- final Matcher fieldMatcher =
this.config.pattern.matcher(inputField.getName());
- final String outputFieldName;
- if (fieldMatcher.find()) {
- // replace
- outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
- } else {
- outputFieldName = inputField.getName();
- }
- log.trace("process() - Mapping field '{}' to '{}'",
inputField.getName(), outputFieldName);
- fieldMappings.put(inputField.getName(), outputFieldName);
- outputSchemaBuilder.field(outputFieldName, inputField.getSchema());
- }
- final Schema outputSchema = outputSchemaBuilder.build();
- final Struct outputStruct = new Struct(outputSchema);
- for (Map.Entry<String, String> entry : fieldMappings.entrySet()) {
- final String inputField = entry.getKey(), outputField = entry.getValue();
- log.trace("process() - Copying '{}' to '{}'", inputField, outputField);
- final Object value = inputStruct.get(inputField);
- outputStruct.put(outputField, value);
+ @Override
+ public void start(KeyValue keyValue) {
+ config = new PatternRenameConfig(keyValue);
}
- return new SchemaAndValue(outputSchema, outputStruct);
- }
- @Override
- protected SchemaAndValue processMap(R record, Map<String, Object> input) {
- final Map<String, Object> outputMap = new LinkedHashMap<>(input.size());
- for (final String inputFieldName : input.keySet()) {
- log.trace("process() - Processing field '{}'", inputFieldName);
- final Matcher fieldMatcher = this.config.pattern.matcher(inputFieldName);
- final String outputFieldName;
- // replace
- if (fieldMatcher.find()) {
- outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
- } else {
- outputFieldName = inputFieldName;
- }
- final Object value = input.get(inputFieldName);
- outputMap.put(outputFieldName, value);
+ @Override
+ protected SchemaAndValue processStruct(R record, Schema inputSchema,
Struct inputStruct) {
+ final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct();
+ outputSchemaBuilder.name(inputSchema.getName());
+ outputSchemaBuilder.doc(inputSchema.getDoc());
+ if (null != inputSchema.getDefaultValue()) {
+ outputSchemaBuilder.defaultValue(inputSchema.getDefaultValue());
+ }
+ if (null != inputSchema.getParameters() &&
!inputSchema.getParameters().isEmpty()) {
+ outputSchemaBuilder.parameters(inputSchema.getParameters());
+ }
+ if (inputSchema.isOptional()) {
+ outputSchemaBuilder.optional();
+ }
+ Map<String, String> fieldMappings = new
HashMap<>(inputSchema.getFields().size());
+ for (final Field inputField : inputSchema.getFields()) {
+ log.trace("process() - Processing field '{}'",
inputField.getName());
+ final Matcher fieldMatcher =
this.config.pattern.matcher(inputField.getName());
+ final String outputFieldName;
+ if (fieldMatcher.find()) {
+ // replace
+ outputFieldName =
fieldMatcher.replaceAll(this.config.replacement);
+ } else {
+ outputFieldName = inputField.getName();
+ }
+ log.trace("process() - Mapping field '{}' to '{}'",
inputField.getName(), outputFieldName);
+ fieldMappings.put(inputField.getName(), outputFieldName);
+ outputSchemaBuilder.field(outputFieldName, inputField.getSchema());
+ }
+ final Schema outputSchema = outputSchemaBuilder.build();
+ final Struct outputStruct = new Struct(outputSchema);
+ for (Map.Entry<String, String> entry : fieldMappings.entrySet()) {
+ final String inputField = entry.getKey(), outputField =
entry.getValue();
+ log.trace("process() - Copying '{}' to '{}'", inputField,
outputField);
+ final Object value = inputStruct.get(inputField);
+ outputStruct.put(outputField, value);
+ }
+ return new SchemaAndValue(outputSchema, outputStruct);
}
- return new SchemaAndValue(null, outputMap);
- }
- /**
- * transform key
- */
- public static class Key extends PatternRename<ConnectRecord> {
@Override
- public ConnectRecord doTransform(ConnectRecord r) {
- final SchemaAndValue transformed = process(r, r.getKeySchema(),
r.getKey());
- ConnectRecord record = new ConnectRecord(
- r.getPosition().getPartition(),
- r.getPosition().getOffset(),
- r.getTimestamp(),
- transformed.schema(),
- transformed.value(),
- r.getSchema(),
- r.getData()
- );
- record.setExtensions(r.getExtensions());
- return record;
+ protected SchemaAndValue processMap(R record, Map<String, Object> input) {
+ final Map<String, Object> outputMap = new
LinkedHashMap<>(input.size());
+ for (final String inputFieldName : input.keySet()) {
+ log.trace("process() - Processing field '{}'", inputFieldName);
+ final Matcher fieldMatcher =
this.config.pattern.matcher(inputFieldName);
+ final String outputFieldName;
+ // replace
+ if (fieldMatcher.find()) {
+ outputFieldName =
fieldMatcher.replaceAll(this.config.replacement);
+ } else {
+ outputFieldName = inputFieldName;
+ }
+ final Object value = input.get(inputFieldName);
+ outputMap.put(outputFieldName, value);
+ }
+ return new SchemaAndValue(null, outputMap);
}
- }
- /**
- * transform value
- */
- public static class Value extends PatternRename<ConnectRecord> {
- @Override
- public ConnectRecord doTransform(ConnectRecord r) {
- final SchemaAndValue transformed = process(r, r.getSchema(),
r.getData());
- ConnectRecord record = new ConnectRecord(
- r.getPosition().getPartition(),
- r.getPosition().getOffset(),
- r.getTimestamp(),
- r.getKeySchema(),
- r.getKey(),
- transformed.schema(),
- transformed.value()
- );
- record.setExtensions(r.getExtensions());
- return record;
+ /**
+ * transform key
+ */
+ public static class Key extends PatternRename<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getKeySchema(),
r.getKey());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ transformed.schema(),
+ transformed.value(),
+ r.getSchema(),
+ r.getData()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
+ }
+
+ /**
+ * transform value
+ */
+ public static class Value extends PatternRename<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getSchema(),
r.getData());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ r.getKeySchema(),
+ r.getKey(),
+ transformed.schema(),
+ transformed.value()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
}
- }
}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
index b7c0e9ef..52ddf0ad 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
@@ -30,53 +30,52 @@ import java.util.regex.Pattern;
*/
public class PatternRenameConfig {
- public static final String FIELD_PATTERN_CONF = "field.pattern";
- static final String FIELD_PATTERN_DOC = "";
+ public static final String FIELD_PATTERN_CONF = "field.pattern";
+ static final String FIELD_PATTERN_DOC = "";
- public static final String FIELD_PATTERN_FLAGS_CONF = "field.pattern.flags";
- static final String FIELD_PATTERN_FLAGS_DOC = "";
+ public static final String FIELD_PATTERN_FLAGS_CONF =
"field.pattern.flags";
+ static final String FIELD_PATTERN_FLAGS_DOC = "";
- public static final String FIELD_REPLACEMENT_CONF = "field.replacement";
- static final String FIELD_REPLACEMENT_DOC = "";
+ public static final String FIELD_REPLACEMENT_CONF = "field.replacement";
+ static final String FIELD_REPLACEMENT_DOC = "";
- static final Map<String, Integer> FLAG_VALUES;
+ static final Map<String, Integer> FLAG_VALUES;
- static {
- Map<String, Integer> map = new HashMap<>();
- map.put("UNICODE_CHARACTER_CLASS", Pattern.UNICODE_CHARACTER_CLASS);
- map.put("CANON_EQ", Pattern.CANON_EQ);
- map.put("UNICODE_CASE", Pattern.UNICODE_CASE);
- map.put("DOTALL", Pattern.DOTALL);
- map.put("LITERAL", Pattern.LITERAL);
- map.put("MULTILINE", Pattern.MULTILINE);
- map.put("COMMENTS", Pattern.COMMENTS);
- map.put("CASE_INSENSITIVE", Pattern.CASE_INSENSITIVE);
- map.put("UNIX_LINES", Pattern.UNIX_LINES);
- FLAG_VALUES = ImmutableMap.copyOf(map);
- }
+ static {
+ Map<String, Integer> map = new HashMap<>();
+ map.put("UNICODE_CHARACTER_CLASS", Pattern.UNICODE_CHARACTER_CLASS);
+ map.put("CANON_EQ", Pattern.CANON_EQ);
+ map.put("UNICODE_CASE", Pattern.UNICODE_CASE);
+ map.put("DOTALL", Pattern.DOTALL);
+ map.put("LITERAL", Pattern.LITERAL);
+ map.put("MULTILINE", Pattern.MULTILINE);
+ map.put("COMMENTS", Pattern.COMMENTS);
+ map.put("CASE_INSENSITIVE", Pattern.CASE_INSENSITIVE);
+ map.put("UNIX_LINES", Pattern.UNIX_LINES);
+ FLAG_VALUES = ImmutableMap.copyOf(map);
+ }
- public final Pattern pattern;
- public final String replacement;
+ public final Pattern pattern;
+ public final String replacement;
- public PatternRenameConfig(KeyValue config) {
- ExtendKeyValue extendConfig = new ExtendKeyValue(config);
- final String pattern = extendConfig.getString(FIELD_PATTERN_CONF);
- final List<String> flagList =
extendConfig.getList(FIELD_PATTERN_FLAGS_CONF);
- int patternFlags = 0;
- for (final String f : flagList) {
- final int flag = FLAG_VALUES.get(f);
- patternFlags = patternFlags | flag;
+ public PatternRenameConfig(KeyValue config) {
+ ExtendKeyValue extendConfig = new ExtendKeyValue(config);
+ final String pattern = extendConfig.getString(FIELD_PATTERN_CONF);
+ final List<String> flagList =
extendConfig.getList(FIELD_PATTERN_FLAGS_CONF);
+ int patternFlags = 0;
+ for (final String f : flagList) {
+ final int flag = FLAG_VALUES.get(f);
+ patternFlags = patternFlags | flag;
+ }
+ this.pattern = Pattern.compile(pattern, patternFlags);
+ this.replacement = config.getString(FIELD_REPLACEMENT_CONF);
}
- this.pattern = Pattern.compile(pattern, patternFlags);
- this.replacement = config.getString(FIELD_REPLACEMENT_CONF);
- }
-
- public Pattern pattern() {
- return pattern;
- }
+ public Pattern pattern() {
+ return pattern;
+ }
- public String replacement() {
- return replacement;
- }
+ public String replacement() {
+ return replacement;
+ }
}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
index ff4b94fb..c2020250 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
@@ -27,14 +27,15 @@ import java.util.regex.Pattern;
/**
* regex router
+ *
* @param <R>
*/
public abstract class RegexRouter<R extends ConnectRecord> extends
BaseTransformation<R> {
private static final Logger LOG =
LoggerFactory.getLogger(RegexRouter.class);
public static final String TOPIC = "topic";
public static final String OVERVIEW_DOC = "Update the record topic using
the configured regular expression and replacement string."
- + "<p/>Under the hood, the regex is compiled to a
<code>java.util.regex.Pattern</code>. "
- + "If the pattern matches the input topic,
<code>java.util.regex.Matcher#replaceFirst()</code> is used with the
replacement string to obtain the new topic.";
+ + "<p/>Under the hood, the regex is compiled to a
<code>java.util.regex.Pattern</code>. "
+ + "If the pattern matches the input topic,
<code>java.util.regex.Matcher#replaceFirst()</code> is used with the
replacement string to obtain the new topic.";
private interface ConfigName {
String REGEX = "regex";
@@ -52,7 +53,7 @@ public abstract class RegexRouter<R extends ConnectRecord>
extends BaseTransform
@Override
public R doTransform(R record) {
- Map<String, Object> partitionMap = (Map<String,
Object>)record.getPosition().getPartition().getPartition();
+ Map<String, Object> partitionMap = (Map<String, Object>)
record.getPosition().getPartition().getPartition();
if (null == partitionMap || !partitionMap.containsKey(TOPIC)) {
LOG.warn("PartitionMap get topic is null , lack of topic config");
return record;
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
index 3ba40c6f..d31ce913 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
@@ -36,151 +36,152 @@ import java.util.stream.Collectors;
/**
* set maximum precision
+ *
* @param <R>
*/
public abstract class SetMaximumPrecision<R extends ConnectRecord> extends
BaseTransformation<R> {
- private static final Logger log =
LoggerFactory.getLogger(SetMaximumPrecision.class);
+ private static final Logger log =
LoggerFactory.getLogger(SetMaximumPrecision.class);
- SetMaximumPrecisionConfig config;
+ SetMaximumPrecisionConfig config;
- @Override
- public void start(KeyValue keyValue) {
- config = new SetMaximumPrecisionConfig(keyValue);
- }
+ @Override
+ public void start(KeyValue keyValue) {
+ config = new SetMaximumPrecisionConfig(keyValue);
+ }
- static final State NOOP = new State(true, null, null);
+ static final State NOOP = new State(true, null, null);
- static class State {
- public final boolean noop;
- public final Schema outputSchema;
- public final Set<String> decimalFields;
+ static class State {
+ public final boolean noop;
+ public final Schema outputSchema;
+ public final Set<String> decimalFields;
- State(boolean noop, Schema outputSchema, Set<String> decimalFields) {
- this.noop = noop;
- this.outputSchema = outputSchema;
- this.decimalFields = decimalFields;
+ State(boolean noop, Schema outputSchema, Set<String> decimalFields) {
+ this.noop = noop;
+ this.outputSchema = outputSchema;
+ this.decimalFields = decimalFields;
+ }
}
- }
- Map<Schema, State> schemaLookup = new HashMap<>();
+ Map<Schema, State> schemaLookup = new HashMap<>();
+
+ public static final String CONNECT_AVRO_DECIMAL_PRECISION_PROP =
"connect.decimal.precision";
+
+ State state(Schema inputSchema) {
+ return this.schemaLookup.computeIfAbsent(inputSchema, new
Function<Schema, State>() {
+ @Override
+ public State apply(Schema schema) {
+ Set<String> decimalFields = inputSchema.getFields().stream()
+ .filter(f ->
Decimal.LOGICAL_NAME.equals(f.getSchema().getName()))
+ .filter(f ->
Integer.parseInt(f.getSchema().getParameters().getOrDefault(CONNECT_AVRO_DECIMAL_PRECISION_PROP,
"64")) > config.maxPrecision())
+ .map(Field::getName)
+ .collect(Collectors.toSet());
+ State result;
+
+ if (decimalFields.size() == 0) {
+ result = NOOP;
+ } else {
+ log.trace("state() - processing schema '{}'",
schema.getName());
+ SchemaBuilder builder = SchemaBuilder.struct()
+ .name(inputSchema.getName())
+ .doc(inputSchema.getDoc())
+ .version(inputSchema.getVersion());
+ if (null != inputSchema.getParameters() &&
!inputSchema.getParameters().isEmpty()) {
+ builder.parameters(inputSchema.getParameters());
+ }
+
+ for (Field field : inputSchema.getFields()) {
+ log.trace("state() - processing field '{}'",
field.getName());
+ if (decimalFields.contains(field.getName())) {
+ Map<String, String> parameters = new
LinkedHashMap<>();
+ if (null != field.getSchema().getParameters() &&
!field.getSchema().getParameters().isEmpty()) {
+
parameters.putAll(field.getSchema().getParameters());
+ }
+
parameters.put(CONNECT_AVRO_DECIMAL_PRECISION_PROP,
Integer.toString(config.maxPrecision()));
+ int scale =
Integer.parseInt(parameters.get(Decimal.SCALE_FIELD));
+ SchemaBuilder fieldBuilder = Decimal.builder(scale)
+ .parameters(parameters)
+ .doc(field.getSchema().getDoc())
+ .version(field.getSchema().getVersion());
+ if (field.getSchema().isOptional()) {
+ fieldBuilder.optional();
+ }
+ Schema fieldSchema = fieldBuilder.build();
+ builder.field(field.getName(), fieldSchema);
+ } else {
+ log.trace("state() - copying field '{}' to new
schema.", field.getName());
+ builder.field(field.getName(), field.getSchema());
+ }
+ }
+
+ Schema outputSchema = builder.build();
+ result = new State(false, outputSchema, decimalFields);
+ }
+
+ return result;
+ }
+ });
- public static final String CONNECT_AVRO_DECIMAL_PRECISION_PROP =
"connect.decimal.precision";
+ }
- State state(Schema inputSchema) {
- return this.schemaLookup.computeIfAbsent(inputSchema, new Function<Schema,
State>() {
- @Override
- public State apply(Schema schema) {
- Set<String> decimalFields = inputSchema.getFields().stream()
- .filter(f -> Decimal.LOGICAL_NAME.equals(f.getSchema().getName()))
- .filter(f ->
Integer.parseInt(f.getSchema().getParameters().getOrDefault(CONNECT_AVRO_DECIMAL_PRECISION_PROP,
"64")) > config.maxPrecision())
- .map(Field::getName)
- .collect(Collectors.toSet());
- State result;
+ @Override
+ protected SchemaAndValue processStruct(R record, Schema inputSchema,
Struct input) {
+ State state = state(inputSchema);
+ SchemaAndValue result;
- if (decimalFields.size() == 0) {
- result = NOOP;
+ if (state.noop) {
+ result = new SchemaAndValue(inputSchema, input);
} else {
- log.trace("state() - processing schema '{}'", schema.getName());
- SchemaBuilder builder = SchemaBuilder.struct()
- .name(inputSchema.getName())
- .doc(inputSchema.getDoc())
- .version(inputSchema.getVersion());
- if (null != inputSchema.getParameters() &&
!inputSchema.getParameters().isEmpty()) {
- builder.parameters(inputSchema.getParameters());
- }
-
- for (Field field : inputSchema.getFields()) {
- log.trace("state() - processing field '{}'", field.getName());
- if (decimalFields.contains(field.getName())) {
- Map<String, String> parameters = new LinkedHashMap<>();
- if (null != field.getSchema().getParameters() &&
!field.getSchema().getParameters().isEmpty()) {
- parameters.putAll(field.getSchema().getParameters());
- }
- parameters.put(CONNECT_AVRO_DECIMAL_PRECISION_PROP,
Integer.toString(config.maxPrecision()));
- int scale =
Integer.parseInt(parameters.get(Decimal.SCALE_FIELD));
- SchemaBuilder fieldBuilder = Decimal.builder(scale)
- .parameters(parameters)
- .doc(field.getSchema().getDoc())
- .version(field.getSchema().getVersion());
- if (field.getSchema().isOptional()) {
- fieldBuilder.optional();
- }
- Schema fieldSchema = fieldBuilder.build();
- builder.field(field.getName(), fieldSchema);
- } else {
- log.trace("state() - copying field '{}' to new schema.",
field.getName());
- builder.field(field.getName(), field.getSchema());
+ Struct struct = new Struct(state.outputSchema);
+ for (Field field : inputSchema.getFields()) {
+ struct.put(field.getName(), input.get(field.getName()));
}
- }
-
- Schema outputSchema = builder.build();
- result = new State(false, outputSchema, decimalFields);
+ result = new SchemaAndValue(state.outputSchema, struct);
}
-
return result;
- }
- });
-
- }
-
- @Override
- protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct
input) {
- State state = state(inputSchema);
- SchemaAndValue result;
-
- if (state.noop) {
- result = new SchemaAndValue(inputSchema, input);
- } else {
- Struct struct = new Struct(state.outputSchema);
- for (Field field : inputSchema.getFields()) {
- struct.put(field.getName(), input.get(field.getName()));
- }
- result = new SchemaAndValue(state.outputSchema, struct);
}
- return result;
- }
- /**
- * transform key
- */
- public static class Key extends SetMaximumPrecision<ConnectRecord> {
- @Override
- public ConnectRecord doTransform(ConnectRecord r) {
- SchemaAndValue transformed = this.process(r, r.getKeySchema(),
r.getKey());
- ConnectRecord record = new ConnectRecord(
- r.getPosition().getPartition(),
- r.getPosition().getOffset(),
- r.getTimestamp(),
- transformed.schema(),
- transformed.value(),
- r.getSchema(),
- r.getData()
- );
- record.setExtensions(r.getExtensions());
- return record;
+ /**
+ * transform key
+ */
+ public static class Key extends SetMaximumPrecision<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ SchemaAndValue transformed = this.process(r, r.getKeySchema(),
r.getKey());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ transformed.schema(),
+ transformed.value(),
+ r.getSchema(),
+ r.getData()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
}
- }
- /**
- * transform value
- */
- public static class Value extends SetMaximumPrecision<ConnectRecord> {
- @Override
- public ConnectRecord doTransform(ConnectRecord r) {
- SchemaAndValue transformed = this.process(r, r.getSchema(), r.getData());
- ConnectRecord record = new ConnectRecord(
- r.getPosition().getPartition(),
- r.getPosition().getOffset(),
- r.getTimestamp(),
- r.getKeySchema(),
- r.getKey(),
- transformed.schema(),
- transformed.value()
- );
- record.setExtensions(r.getExtensions());
- return record;
+ /**
+ * transform value
+ */
+ public static class Value extends SetMaximumPrecision<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ SchemaAndValue transformed = this.process(r, r.getSchema(),
r.getData());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ r.getKeySchema(),
+ r.getKey(),
+ transformed.schema(),
+ transformed.value()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
}
- }
}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
index 9a801c39..f8902283 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
@@ -22,16 +22,16 @@ import io.openmessaging.KeyValue;
* set maximum precision config
*/
public class SetMaximumPrecisionConfig {
- public static final String MAX_PRECISION_CONFIG = "precision.max";
- static final String MAX_PRECISION_DOC = "The maximum precision allowed.";
+ public static final String MAX_PRECISION_CONFIG = "precision.max";
+ static final String MAX_PRECISION_DOC = "The maximum precision allowed.";
- private final int maxPrecision;
+ private final int maxPrecision;
- public SetMaximumPrecisionConfig(KeyValue config) {
- this.maxPrecision = config.getInt(MAX_PRECISION_CONFIG);
- }
+ public SetMaximumPrecisionConfig(KeyValue config) {
+ this.maxPrecision = config.getInt(MAX_PRECISION_CONFIG);
+ }
- public int maxPrecision(){
- return maxPrecision;
- }
+ public int maxPrecision() {
+ return maxPrecision;
+ }
}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
index 54ed09d4..aad03b95 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
@@ -23,50 +23,49 @@ import org.slf4j.LoggerFactory;
/**
* set null
+ *
* @param <R>
*/
public abstract class SetNull<R extends ConnectRecord> extends
BaseTransformation<R> {
- private static final Logger log = LoggerFactory.getLogger(SetNull.class);
-
-
- PatternRenameConfig config;
+ private static final Logger log = LoggerFactory.getLogger(SetNull.class);
- @Override
- public void start(KeyValue keyValue) {
- config = new PatternRenameConfig(keyValue);
- }
+ PatternRenameConfig config;
- /**
- * transform key
- */
- public static class Key extends SetNull<ConnectRecord> {
@Override
- public ConnectRecord doTransform(ConnectRecord r) {
- return new ConnectRecord(
- r.getPosition().getPartition(),
- r.getPosition().getOffset(),
- r.getTimestamp(),
- null,
- null,
- r.getSchema(),
- r.getData()
- );
+ public void start(KeyValue keyValue) {
+ config = new PatternRenameConfig(keyValue);
}
- }
+ /**
+ * transform key
+ */
+ public static class Key extends SetNull<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ return new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ null,
+ null,
+ r.getSchema(),
+ r.getData()
+ );
+ }
+ }
- public static class Value extends SetNull<ConnectRecord> {
- @Override
- public ConnectRecord doTransform(ConnectRecord r) {
- return new ConnectRecord(
- r.getPosition().getPartition(),
- r.getPosition().getOffset(),
- r.getTimestamp(),
- r.getKeySchema(),
- r.getKey(),
- r.getSchema(),
- r.getData()
- );
+ public static class Value extends SetNull<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ return new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ r.getKeySchema(),
+ r.getKey(),
+ r.getSchema(),
+ r.getData()
+ );
+ }
}
- }
}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
index 9297718f..48ae4ef4 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.connect.transforms.util;
import io.openmessaging.KeyValue;
@@ -14,9 +31,9 @@ import java.util.regex.Pattern;
public class ExtendKeyValue implements KeyValue {
private static final Pattern COMMA_WITH_WHITESPACE =
Pattern.compile("\\s*,\\s*");
-
private KeyValue config;
- public ExtendKeyValue(KeyValue config){
+
+ public ExtendKeyValue(KeyValue config) {
this.config = config;
}
@@ -92,11 +109,12 @@ public class ExtendKeyValue implements KeyValue {
/**
* get list
+ *
* @param s
* @return
*/
- public List getList(String s){
- if (!this.config.containsKey(s)){
+ public List getList(String s) {
+ if (!this.config.containsKey(s)) {
return new ArrayList();
}
String config = this.config.getString(s).trim();
@@ -105,6 +123,7 @@ public class ExtendKeyValue implements KeyValue {
/**
* get list by class
+ *
* @param s
* @param clazz
* @param <T>
@@ -113,7 +132,7 @@ public class ExtendKeyValue implements KeyValue {
public <T> List<T> getList(String s, Class<T> clazz) {
List configs = getList(s);
List<T> castConfigs = new ArrayList<>();
- configs.forEach(config ->{
+ configs.forEach(config -> {
castConfigs.add(clazz.cast(config));
});
return castConfigs;
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
index 57c38c95..6a88b0c9 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
@@ -52,7 +52,7 @@ public class SchemaHelper {
if (!(input instanceof BigDecimal)) {
throw new
UnsupportedOperationException(String.format("Unsupported Type: %s",
input.getClass()));
}
- builder = Decimal.builder(((BigDecimal)input).scale());
+ builder = Decimal.builder(((BigDecimal) input).scale());
}
return builder.optional();
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
index 2317a095..c53a7346 100644
---
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.connect.transforms.util;
import io.openmessaging.connector.api.data.Schema;
@@ -7,23 +24,23 @@ import io.openmessaging.connector.api.data.SchemaBuilder;
* schema util
*/
public class SchemaUtil {
- public static Schema INT8_SCHEMA = SchemaBuilder.int8().build();
- public static Schema INT16_SCHEMA = SchemaBuilder.int16().build();
- public static Schema INT32_SCHEMA = SchemaBuilder.int32().build();
- public static Schema INT64_SCHEMA = SchemaBuilder.int64().build();
- public static Schema FLOAT32_SCHEMA = SchemaBuilder.float32().build();
- public static Schema FLOAT64_SCHEMA = SchemaBuilder.float64().build();
- public static Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build();
- public static Schema STRING_SCHEMA = SchemaBuilder.string().build();
- public static Schema BYTES_SCHEMA = SchemaBuilder.bytes().build();
+ public static final Schema INT8_SCHEMA = SchemaBuilder.int8().build();
+ public static final Schema INT16_SCHEMA = SchemaBuilder.int16().build();
+ public static final Schema INT32_SCHEMA = SchemaBuilder.int32().build();
+ public static final Schema INT64_SCHEMA = SchemaBuilder.int64().build();
+ public static final Schema FLOAT32_SCHEMA =
SchemaBuilder.float32().build();
+ public static final Schema FLOAT64_SCHEMA =
SchemaBuilder.float64().build();
+ public static final Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build();
+ public static final Schema STRING_SCHEMA = SchemaBuilder.string().build();
+ public static final Schema BYTES_SCHEMA = SchemaBuilder.bytes().build();
- public static Schema OPTIONAL_INT8_SCHEMA =
SchemaBuilder.int8().optional().build();
- public static Schema OPTIONAL_INT16_SCHEMA =
SchemaBuilder.int16().optional().build();
- public static Schema OPTIONAL_INT32_SCHEMA =
SchemaBuilder.int32().optional().build();
- public static Schema OPTIONAL_INT64_SCHEMA =
SchemaBuilder.int64().optional().build();
- public static Schema OPTIONAL_FLOAT32_SCHEMA =
SchemaBuilder.float32().optional().build();
- public static Schema OPTIONAL_FLOAT64_SCHEMA =
SchemaBuilder.float64().optional().build();
- public static Schema OPTIONAL_BOOLEAN_SCHEMA =
SchemaBuilder.bool().optional().build();
- public static Schema OPTIONAL_STRING_SCHEMA =
SchemaBuilder.string().optional().build();
- public static Schema OPTIONAL_BYTES_SCHEMA =
SchemaBuilder.bytes().optional().build();
+ public static final Schema OPTIONAL_INT8_SCHEMA =
SchemaBuilder.int8().optional().build();
+ public static final Schema OPTIONAL_INT16_SCHEMA =
SchemaBuilder.int16().optional().build();
+ public static final Schema OPTIONAL_INT32_SCHEMA =
SchemaBuilder.int32().optional().build();
+ public static final Schema OPTIONAL_INT64_SCHEMA =
SchemaBuilder.int64().optional().build();
+ public static final Schema OPTIONAL_FLOAT32_SCHEMA =
SchemaBuilder.float32().optional().build();
+ public static final Schema OPTIONAL_FLOAT64_SCHEMA =
SchemaBuilder.float64().optional().build();
+ public static final Schema OPTIONAL_BOOLEAN_SCHEMA =
SchemaBuilder.bool().optional().build();
+ public static final Schema OPTIONAL_STRING_SCHEMA =
SchemaBuilder.string().optional().build();
+ public static final Schema OPTIONAL_BYTES_SCHEMA =
SchemaBuilder.bytes().optional().build();
}