This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new bbf68c6 [FLINK-20410] Retry querying for schema in the schema
registry e2e test.
bbf68c6 is described below
commit bbf68c656a352129ab621efacdf0ab561071d3de
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Mon Nov 30 10:36:25 2020 +0100
[FLINK-20410] Retry querying for schema in the schema registry e2e test.
---
.../util/kafka/SQLClientSchemaRegistryITCase.java | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
index 1906b58..34500d3 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.tests.util.kafka;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.categories.TravisGroup1;
import org.apache.flink.tests.util.flink.FlinkContainer;
@@ -26,6 +27,7 @@ import
org.apache.flink.tests.util.kafka.containers.SchemaRegistryContainer;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
@@ -42,6 +44,7 @@ import org.testcontainers.utility.DockerImageName;
import java.io.IOException;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -192,7 +195,7 @@ public class SQLClientSchemaRegistryITCase {
executeSqlStatements(sqlLines);
- List<Integer> versions =
registryClient.getAllVersions(behaviourSubject);
+ List<Integer> versions = getAllVersions(behaviourSubject);
assertThat(versions.size(), equalTo(1));
List<Object> userBehaviors = kafkaClient.readMessages(
1,
@@ -217,6 +220,20 @@ public class SQLClientSchemaRegistryITCase {
));
}
+ private List<Integer> getAllVersions(String behaviourSubject) throws
Exception {
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
+ Exception ex = new IllegalStateException(
+ "Could not query schema registry. Negative deadline
provided.");
+ while (deadline.hasTimeLeft()) {
+ try {
+ return
registryClient.getAllVersions(behaviourSubject);
+ } catch (RestClientException e) {
+ ex = e;
+ }
+ }
+ throw ex;
+ }
+
private void executeSqlStatements(List<String> sqlLines) throws
Exception {
flink.submitSQLJob(new
SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
.addJars(sqlAvroJar, sqlAvroRegistryJar,
sqlConnectorKafkaJar, sqlToolBoxJar)