This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f568c8f4982 [improve][sql] Remove unnecessary future encapsulation
(#19959)
f568c8f4982 is described below
commit f568c8f49828be42b8a7e81abea45a80cf4f93f4
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Apr 5 19:12:30 2023 +0800
[improve][sql] Remove unnecessary future encapsulation (#19959)
Signed-off-by: tison <[email protected]>
Co-authored-by: tison <[email protected]>
---
.../sql/presto/PulsarSqlSchemaInfoProvider.java | 33 ++++-------
.../pulsar/sql/presto/TestPulsarRecordCursor.java | 67 +++++++++++-----------
2 files changed, 46 insertions(+), 54 deletions(-)
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
index d8f7db96b83..e2d030d2d7f 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
@@ -27,8 +27,8 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.common.naming.TopicName;
@@ -50,10 +50,13 @@ public class PulsarSqlSchemaInfoProvider implements
SchemaInfoProvider {
private final PulsarAdmin pulsarAdmin;
- private final LoadingCache<BytesSchemaVersion, SchemaInfo> cache =
CacheBuilder.newBuilder().maximumSize(100000)
- .expireAfterAccess(30, TimeUnit.MINUTES).build(new
CacheLoader<BytesSchemaVersion, SchemaInfo>() {
+ private final LoadingCache<BytesSchemaVersion,
CompletableFuture<SchemaInfo>> cache = CacheBuilder.newBuilder()
+ .maximumSize(100000)
+ .expireAfterAccess(30, TimeUnit.MINUTES)
+ .build(new CacheLoader<>() {
+ @Nonnull
@Override
- public SchemaInfo load(BytesSchemaVersion schemaVersion)
throws Exception {
+ public CompletableFuture<SchemaInfo> load(@Nonnull
BytesSchemaVersion schemaVersion) {
return loadSchema(schemaVersion);
}
});
@@ -69,7 +72,7 @@ public class PulsarSqlSchemaInfoProvider implements
SchemaInfoProvider {
if (null == schemaVersion) {
return completedFuture(null);
}
- return
completedFuture(cache.get(BytesSchemaVersion.of(schemaVersion)));
+ return cache.get(BytesSchemaVersion.of(schemaVersion));
} catch (ExecutionException e) {
LOG.error("Can't get generic schema for topic {} schema version
{}",
topicName.toString(), new String(schemaVersion,
StandardCharsets.UTF_8), e);
@@ -79,14 +82,7 @@ public class PulsarSqlSchemaInfoProvider implements
SchemaInfoProvider {
@Override
public CompletableFuture<SchemaInfo> getLatestSchema() {
- try {
- return completedFuture(pulsarAdmin.schemas()
- .getSchemaInfo(topicName.toString()));
- } catch (PulsarAdminException e) {
- LOG.error("Can't get current schema for topic {}",
- topicName.toString(), e);
- return FutureUtil.failedFuture(e.getCause());
- }
+ return pulsarAdmin.schemas().getSchemaInfoAsync(topicName.toString());
}
@Override
@@ -94,24 +90,19 @@ public class PulsarSqlSchemaInfoProvider implements
SchemaInfoProvider {
return topicName.getLocalName();
}
- private SchemaInfo loadSchema(BytesSchemaVersion bytesSchemaVersion)
throws PulsarAdminException {
+ private CompletableFuture<SchemaInfo> loadSchema(BytesSchemaVersion
bytesSchemaVersion) {
ClassLoader originalContextLoader =
Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(InjectionManagerFactory.class.getClassLoader());
long version = ByteBuffer.wrap(bytesSchemaVersion.get()).getLong();
- SchemaInfo schemaInfo =
pulsarAdmin.schemas().getSchemaInfo(topicName.toString(), version);
- if (schemaInfo == null) {
- throw new RuntimeException(
- "The specific version (" + version + ") schema of the
topic " + topicName + " is null");
- }
- return schemaInfo;
+ return
pulsarAdmin.schemas().getSchemaInfoAsync(topicName.toString(), version);
} finally {
Thread.currentThread().setContextClassLoader(originalContextLoader);
}
}
- public static SchemaInfo defaultSchema(){
+ public static SchemaInfo defaultSchema() {
return Schema.BYTES.getSchemaInfo();
}
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 7eaa2da498f..40ced8e4f8e 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -18,6 +18,22 @@
*/
package org.apache.pulsar.sql.presto;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airlift.log.Logger;
import io.netty.buffer.ByteBuf;
@@ -26,7 +42,18 @@ import io.trino.spi.type.DecimalType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.math.BigDecimal;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import lombok.Data;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
@@ -57,34 +84,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.Test;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
public class TestPulsarRecordCursor extends TestPulsarConnector {
private static final Logger log = Logger.get(TestPulsarRecordCursor.class);
@@ -490,8 +489,8 @@ public class TestPulsarRecordCursor extends
TestPulsarConnector {
// If the schemaVersion of the message is not null, try to get the
schema.
Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.AVRO);
Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new
LongSchemaVersion(0).bytes());
- Mockito.when(schemas.getSchemaInfo(anyString(), eq(0L)))
- .thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+ Mockito.when(schemas.getSchemaInfoAsync(anyString(), eq(0L)))
+
.thenReturn(CompletableFuture.completedFuture(Schema.AVRO(Foo.class).getSchemaInfo()));
schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor,
pulsarSplit);
assertEquals(SchemaType.AVRO, schemaInfo.getType());
@@ -516,19 +515,21 @@ public class TestPulsarRecordCursor extends
TestPulsarConnector {
// If the specific version schema is null, throw runtime exception.
Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new
LongSchemaVersion(1L).bytes());
- Mockito.when(schemas.getSchemaInfo(schemaTopic, 1)).thenReturn(null);
+ Mockito.when(schemas.getSchemaInfoAsync(schemaTopic, 1))
+ .thenReturn(CompletableFuture.completedFuture(null));
try {
schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor,
pulsarSplit);
fail("The specific version " + 1 + " schema is null, should
fail.");
} catch (InvocationTargetException e) {
String schemaVersion = BytesSchemaVersion.of(new
LongSchemaVersion(1L).bytes()).toString();
assertTrue(e.getCause() instanceof RuntimeException);
- assertTrue(e.getCause().getMessage().contains("schema of the topic
" + schemaTopic + " is null"));
+ assertTrue(e.getCause().getMessage().contains("schema of the table
" + topic + " is null"));
}
// Get the specific version schema.
Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new
LongSchemaVersion(2L).bytes());
- Mockito.when(schemas.getSchemaInfo(schemaTopic,
2)).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+ Mockito.when(schemas.getSchemaInfoAsync(schemaTopic, 2))
+
.thenReturn(CompletableFuture.completedFuture(Schema.AVRO(Foo.class).getSchemaInfo()));
schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor,
pulsarSplit);
assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
}