ruanhang1993 commented on a change in pull request #18516:
URL: https://github.com/apache/flink/pull/18516#discussion_r805231841



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
##########
@@ -58,20 +58,20 @@
 public class KafkaSourceExternalContext implements 
DataStreamSourceExternalContext<String> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSourceExternalContext.class);
-    private static final String TOPIC_NAME_PREFIX = "kafka-test-topic-";
-    private static final Pattern TOPIC_NAME_PATTERN = 
Pattern.compile(TOPIC_NAME_PREFIX + ".*");
-    private static final String GROUP_ID_PREFIX = 
"kafka-source-external-context-";
+    protected static final String TOPIC_NAME_PREFIX = "kafka-test-topic-";
+    protected static final Pattern TOPIC_NAME_PATTERN = 
Pattern.compile(TOPIC_NAME_PREFIX + ".*");
+    protected static final String GROUP_ID_PREFIX = 
"kafka-source-external-context-";
     private static final int NUM_RECORDS_UPPER_BOUND = 500;
     private static final int NUM_RECORDS_LOWER_BOUND = 100;
 
     private final List<URL> connectorJarPaths;
-    private final String bootstrapServers;
-    private final String topicName;
-    private final SplitMappingMode splitMappingMode;
-    private final AdminClient adminClient;
+    protected final String bootstrapServers;
+    protected final String topicName;
+    protected final SplitMappingMode splitMappingMode;
+    protected final AdminClient adminClient;

Review comment:
       This is invoked in another PR. I will revert this.

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
##########
@@ -68,6 +71,7 @@ public FlinkContainerTestEnvironment(
         config.set(HEARTBEAT_INTERVAL, 1000L);
         config.set(HEARTBEAT_TIMEOUT, 5000L);
         config.set(SLOT_REQUEST_TIMEOUT, 10000L);
+        config.set(METRIC_FETCHER_UPDATE_INTERVAL, 1000L);

Review comment:
       fixed

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
##########
@@ -53,9 +61,12 @@
     private boolean isStarted = false;
 
     public MiniClusterTestEnvironment() {
+        Configuration conf = new Configuration();
+        conf.set(METRIC_FETCHER_UPDATE_INTERVAL, 1000L);

Review comment:
       fixed

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
##########
@@ -71,7 +82,18 @@ public MiniClusterTestEnvironment() {
     @Override
     public StreamExecutionEnvironment createExecutionEnvironment(
             TestEnvironmentSettings envOptions) {
-        return StreamExecutionEnvironment.getExecutionEnvironment();
+        Configuration configuration = new Configuration();
+        if (envOptions.getSavepointRestorePath() != null) {
+            configuration.set(SAVEPOINT_PATH, 
envOptions.getSavepointRestorePath());

Review comment:
       fixed

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQueryer.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** The queryer is used to get job metrics by rest API. */

Review comment:
       fixed

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQueryer.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** The queryer is used to get job metrics by rest API. */
+public class MetricQueryer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryer.class);
+    private RestClient restClient;
+
+    public MetricQueryer(Configuration configuration, Executor executor)
+            throws ConfigurationException {
+        restClient = new RestClient(configuration, executor);
+    }
+
+    public JobDetailsInfo getJobDetails(TestEnvironment.Endpoint endpoint, 
JobID jobId)
+            throws Exception {
+        String jmAddress = endpoint.getAddress();
+        int jmPort = endpoint.getPort();
+
+        final JobMessageParameters params = new JobMessageParameters();
+        params.jobPathParameter.resolve(jobId);
+
+        return restClient
+                .sendRequest(
+                        jmAddress,
+                        jmPort,
+                        JobDetailsHeaders.getInstance(),
+                        params,
+                        EmptyRequestBody.getInstance())
+                .get(30, TimeUnit.SECONDS);
+    }
+
+    public AggregatedMetricsResponseBody getMetricList(
+            TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID 
vertexId) throws Exception {
+        AggregatedSubtaskMetricsParameters subtaskMetricsParameters =
+                new AggregatedSubtaskMetricsParameters();
+        Iterator<MessagePathParameter<?>> pathParams =
+                subtaskMetricsParameters.getPathParameters().iterator();
+        ((JobIDPathParameter) pathParams.next()).resolve(jobId);
+        ((JobVertexIdPathParameter) pathParams.next()).resolve(vertexId);
+        return restClient
+                .sendRequest(
+                        endpoint.getAddress(),
+                        endpoint.getPort(),
+                        AggregatedSubtaskMetricsHeaders.getInstance(),
+                        subtaskMetricsParameters,
+                        EmptyRequestBody.getInstance())
+                .get(30, TimeUnit.SECONDS);
+    }
+
+    public AggregatedMetricsResponseBody getMetrics(
+            TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID 
vertexId, String filters)
+            throws Exception {
+        AggregatedSubtaskMetricsParameters subtaskMetricsParameters =
+                new AggregatedSubtaskMetricsParameters();
+        Iterator<MessagePathParameter<?>> pathParams =
+                subtaskMetricsParameters.getPathParameters().iterator();
+        ((JobIDPathParameter) pathParams.next()).resolve(jobId);
+        ((JobVertexIdPathParameter) pathParams.next()).resolve(vertexId);
+        MetricsFilterParameter metricFilter =
+                (MetricsFilterParameter)
+                        
subtaskMetricsParameters.getQueryParameters().iterator().next();
+        metricFilter.resolveFromString(filters);
+
+        return restClient
+                .sendRequest(
+                        endpoint.getAddress(),
+                        endpoint.getPort(),
+                        AggregatedSubtaskMetricsHeaders.getInstance(),
+                        subtaskMetricsParameters,
+                        EmptyRequestBody.getInstance())
+                .get(30, TimeUnit.SECONDS);
+    }
+
+    public Double getMetricByRestApi(

Review comment:
       fixed

##########
File path: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
##########
@@ -40,6 +42,9 @@
     @TestExternalSystem
     PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.mock());
 
+    @Semantic
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};

Review comment:
       Actually, this semantic tests are contained in the sink tests for Kafka 
connector. There are not ITCase in the source now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to