imaffe commented on a change in pull request #622:
URL: https://github.com/apache/rocketmq-externals/pull/622#discussion_r469678032
##########
File path:
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestSender.java
##########
@@ -0,0 +1,38 @@
+package org.apache.rocketmq.connect.runtime.rest;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import java.net.URLEncoder;
+
+public class RestSender {
+ public String sendHttpRequest(String baseUrl, String configs){
+ try {
+ CloseableHttpClient client = null;
+ CloseableHttpResponse response = null;
+ try {
+ String encoded_configs = URLEncoder.encode(configs,"utf-8");
+ HttpGet httpGet = new HttpGet(baseUrl + encoded_configs);
+ client = HttpClients.createDefault();
+ response = client.execute(httpGet);
+ HttpEntity entity = response.getEntity();
+ String result = EntityUtils.toString(entity);
+ System.out.println(result);
+ return result;
+ } finally {
+ if (response != null) {
+ response.close();
+ }
+ if (client != null) {
+ client.close();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return "";
+ }
+}
Review comment:
(Optional) newline here
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/command/ReloadPluginsSubCommand.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.tools.commom.Config;
+import org.apache.rocketmq.connect.tools.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class ReloadPluginsSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public ReloadPluginsSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "reloadPlugins";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Reload the Connector file under the plugin directory";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws
SubCommandException {
+ try {
+ String url = "http://" + config.getHttpAddr() + ":" +
config.getHttpPort() + "/plugin/reload";
Review comment:
(Optional) I remember there is a url path manipulating util class, is it
possible that we avoid string concat, instead use standard url api manipulation
functions ?
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/command/QueryConnectorConfigSubCommand.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import org.apache.rocketmq.connect.tools.commom.Config;
+import org.apache.rocketmq.connect.tools.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class QueryConnectorConfigSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public QueryConnectorConfigSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "queryConnectorConfig";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get configuration information for a connector";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "connectorName", true, "connector name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws
SubCommandException {
+ try {
+ String connectorName = commandLine.getOptionValue('c').trim();
+ String url = "http://" + config.getHttpAddr() + ":" +
config.getHttpPort() + "/connectors/" + connectorName + "/config";
Review comment:
we better leave no magic value in code, tho there are many other places
are using magic values .... (Let's see if it is possible to try to minimize use
of magic values)
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/command/ReloadPluginsSubCommand.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.tools.commom.Config;
+import org.apache.rocketmq.connect.tools.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class ReloadPluginsSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public ReloadPluginsSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "reloadPlugins";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Reload the Connector file under the plugin directory";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws
SubCommandException {
+ try {
+ String url = "http://" + config.getHttpAddr() + ":" +
config.getHttpPort() + "/plugin/reload";
+ System.out.printf("Send request to %s%n", url);
+ String result = new RestSender().sendHttpRequest(url, "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ }
+ }
+}
Review comment:
op
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/command/QueryConnectorStatusSubCommand.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.tools.commom.Config;
+import org.apache.rocketmq.connect.tools.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class QueryConnectorStatusSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public QueryConnectorStatusSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "queryConnectorStatus";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get Status information for a connector";
+ }
+
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "connectorName", true, "connector name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws
SubCommandException {
+ try {
+ String connectorName = commandLine.getOptionValue('c').trim();
+ String url = "http://" + config.getHttpAddr() + ":" +
config.getHttpPort() + "/connectors/" + connectorName + "/status";
+ System.out.printf("Send request to %s%n", url);
+ String result = new RestSender().sendHttpRequest(url, "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ }
+ }
+}
Review comment:
op
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/utils/RestSender.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.utils;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.rocketmq.connect.tools.commom.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URLEncoder;
+
+public class RestSender {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+ public String sendHttpRequest(String baseUrl, String configs) {
+
+ try {
+ CloseableHttpClient client = null;
+ CloseableHttpResponse response = null;
+ try {
+ String encodedConfigs = URLEncoder.encode(configs, "utf-8");
Review comment:
magic value "utf-8" here
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/command/GetAllocatedInfoCommand.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.tools.commom.Config;
+import org.apache.rocketmq.connect.tools.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class GetAllocatedInfoCommand implements SubCommand {
+
+ private final Config config;
+
+ public GetAllocatedInfoCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "getAllocatedInfo";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get the load information of the current worker";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws
SubCommandException {
+ try {
+ String url = "http://" + config.getHttpAddr() + ":" +
config.getHttpPort() + "/" + commandName();
+ System.out.printf("Send request to %s %n", url);
+ String result = new RestSender().sendHttpRequest(url, "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ }
+ }
+}
Review comment:
op
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/command/GetClusterInfoSubCommand.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.tools.commom.Config;
+import org.apache.rocketmq.connect.tools.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class GetClusterInfoSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public GetClusterInfoSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "getClusterInfo";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get cluster information";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws
SubCommandException {
+ try {
+ String url = "http://" + config.getHttpAddr() + ":" +
config.getHttpPort() + "/" + commandName();
+ System.out.printf("Send request to %s%n", url);
+ String result = new RestSender().sendHttpRequest(url, "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ }
+ }
+}
Review comment:
op
##########
File path: rocketmq-connect/rocketmq-connect-cli/README.md
##########
@@ -0,0 +1,59 @@
+# rocketmq-connect-CLI Admin
+
+与RocketMQ中的mqadmin类似,使用简洁的CLI命令实现增加,删除,查看connector等功能
+
+在rocketmq-connect\tools目录下,运行`sh connectAdmin`
Review comment:
AFAIK this might not be the newest directory
##########
File path: rocketmq-connect/rocketmq-connect-cli/pom.xml
##########
@@ -0,0 +1,183 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-connect</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-connect-cli</artifactId>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+ <!-- Compiler settings properties -->
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+
+ <!-- RocketMQ Version-->
+ <rocketmq.version>4.7.0</rocketmq.version>
+ </properties>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <excludes>
+ <exclude>*.xml</exclude>
+ <exclude>connect.conf</exclude>
+ </excludes>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2.1</version>
+ <configuration>
+ <finalName>distribution</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/main/resources/package.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ </dependencies>
+
+
+</project>
Review comment:
Same here, optional
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/command/QueryConnectorConfigSubCommand.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import org.apache.rocketmq.connect.tools.commom.Config;
+import org.apache.rocketmq.connect.tools.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class QueryConnectorConfigSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public QueryConnectorConfigSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "queryConnectorConfig";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get configuration information for a connector";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "connectorName", true, "connector name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws
SubCommandException {
+ try {
+ String connectorName = commandLine.getOptionValue('c').trim();
+ String url = "http://" + config.getHttpAddr() + ":" +
config.getHttpPort() + "/connectors/" + connectorName + "/config";
+ System.out.printf("Send request to %s%n", url);
+ String result = new RestSender().sendHttpRequest(url, "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ }
+ }
+}
Review comment:
op
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/command/StopAllSubCommand.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.tools.commom.Config;
+import org.apache.rocketmq.connect.tools.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class StopAllSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public StopAllSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "stopAll";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Stop and delete all Connectors and all configuration
information";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws
SubCommandException {
+ try {
+ String url = "http://" + config.getHttpAddr() + ":" +
config.getHttpPort() + "/connectors/" + commandName();
+ System.out.printf("Send request to %s%n", url);
+ String result = new RestSender().sendHttpRequest(url, "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ }
+ }
+}
Review comment:
op
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/command/StopConnectorSubCommand.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.tools.commom.Config;
+import org.apache.rocketmq.connect.tools.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+
+public class StopConnectorSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public StopConnectorSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "stopConnector";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Stop a specific connector by connector-name";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "connectorName", true, "connector name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws
SubCommandException {
+ try {
+ String connectorName = commandLine.getOptionValue('c').trim();
+ String url = "http://" + config.getHttpAddr() + ":" +
config.getHttpPort() + "/connectors/" + connectorName + "/stop";
+ System.out.printf("Send request to %s%n", url);
+ String result = new RestSender().sendHttpRequest(url, "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ }
+ }
+}
Review comment:
op
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/command/StopConnectorSubCommand.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.tools.commom.Config;
+import org.apache.rocketmq.connect.tools.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+
+public class StopConnectorSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public StopConnectorSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "stopConnector";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Stop a specific connector by connector-name";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "connectorName", true, "connector name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws
SubCommandException {
Review comment:
This method has some duplicate code, is it possible to refactor them so
we can reduce duplicated code ~
##########
File path:
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
##########
@@ -136,6 +136,7 @@ public KeyValue configs() {
try {
Collection<SourceDataEntry> toSendEntries =
sourceTask.poll();
if (null != toSendEntries && toSendEntries.size() > 0) {
+ log.info("send records");
Review comment:
I left some very bad log conventions in the runtime and I apologize for
that, but we need to try to make log messages clear ~ This might be a log added
during debug process so we might not need it ~ I'll probably refactor the logs
in externals (as some of them looks messy, with some needed logs missing) it
would be great if you're willing to help :)
##########
File path: rocketmq-connect/rocketmq-connect-cli/src/main/resources/connect.conf
##########
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+## Http port for user to access REST API
+httpPort=8081
+
+# RocketMQ Connect Runtime Addr
+httpAddr=localhost
Review comment:
op
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/utils/RestSender.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.utils;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.rocketmq.connect.tools.commom.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URLEncoder;
+
+public class RestSender {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+ public String sendHttpRequest(String baseUrl, String configs) {
+
+ try {
+ CloseableHttpClient client = null;
+ CloseableHttpResponse response = null;
+ try {
+ String encodedConfigs = URLEncoder.encode(configs, "utf-8");
+ HttpGet httpGet = new HttpGet(baseUrl + encodedConfigs);
+ client = HttpClients.createDefault();
+ response = client.execute(httpGet);
+ HttpEntity entity = response.getEntity();
+ String result = EntityUtils.toString(entity);
+ //log.info(result);
+ return result;
+ } finally {
+ if (response != null) {
+ response.close();
+ }
+ if (client != null) {
+ client.close();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return "";
+ }
+}
Review comment:
op
##########
File path: rocketmq-connect/rocketmq-connect-cli/src/main/resources/package.xml
##########
@@ -0,0 +1,40 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<assembly xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/assembly-1.0.0.xsd
+ ">
+ <id>package</id>
+ <formats>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>src/main/resources</directory>
+ <outputDirectory>conf</outputDirectory>
+ <filtered>false</filtered>
+ </fileSet>
+ </fileSets>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>lib</outputDirectory>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+</assembly>
Review comment:
op
##########
File path:
rocketmq-connect/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
##########
@@ -80,7 +80,6 @@ public void doRebalance() {
List<String> curAliveWorkers =
clusterManagementService.getAllAliveWorkers();
Map<String, ConnectKeyValue> curConnectorConfigs =
configManagementService.getConnectorConfigs();
Map<String, List<ConnectKeyValue>> curTaskConfigs =
configManagementService.getTaskConfigs();
- log.info("[ISSUE #2027] The connectorConfigs are:" +
curConnectorConfigs.toString() + " with timestamp :" +
System.currentTimeMillis());
Review comment:
Nice job here~
##########
File path:
rocketmq-connect/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/tools/utils/FileAndPropertyUtil.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.tools.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Properties;
+
+/**
+ * Utils for file and property.
+ */
+public class FileAndPropertyUtil {
+
+ public static String file2String(final String fileName) throws IOException
{
+ File file = new File(fileName);
+ return file2String(file);
+ }
+
+ public static String file2String(final File file) throws IOException {
+ if (file.exists()) {
+ byte[] data = new byte[(int) file.length()];
+ boolean result;
+
+ FileInputStream inputStream = null;
+ try {
+ inputStream = new FileInputStream(file);
+ int len = inputStream.read(data);
+ result = len == data.length;
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+
+ if (result) {
+ return new String(data);
+ }
+ }
+ return null;
+ }
+
+ public static void properties2Object(final Properties p, final Object
object) {
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getProperty(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg = null;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long"))
{
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") ||
cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") ||
cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") ||
cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
Review comment:
Let's try to not ignore everything here ~ like can we do catch
reflection exceptions and log it or return something ~
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]