This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f568c8f4982 [improve][sql] Remove unnecessary future encapsulation 
(#19959)
f568c8f4982 is described below

commit f568c8f49828be42b8a7e81abea45a80cf4f93f4
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Apr 5 19:12:30 2023 +0800

    [improve][sql] Remove unnecessary future encapsulation (#19959)
    
    Signed-off-by: tison <[email protected]>
    Co-authored-by: tison <[email protected]>
---
 .../sql/presto/PulsarSqlSchemaInfoProvider.java    | 33 ++++-------
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  | 67 +++++++++++-----------
 2 files changed, 46 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
index d8f7db96b83..e2d030d2d7f 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
@@ -27,8 +27,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.common.naming.TopicName;
@@ -50,10 +50,13 @@ public class PulsarSqlSchemaInfoProvider implements 
SchemaInfoProvider {
 
     private final PulsarAdmin pulsarAdmin;
 
-    private final LoadingCache<BytesSchemaVersion, SchemaInfo> cache = 
CacheBuilder.newBuilder().maximumSize(100000)
-            .expireAfterAccess(30, TimeUnit.MINUTES).build(new 
CacheLoader<BytesSchemaVersion, SchemaInfo>() {
+    private final LoadingCache<BytesSchemaVersion, 
CompletableFuture<SchemaInfo>> cache = CacheBuilder.newBuilder()
+            .maximumSize(100000)
+            .expireAfterAccess(30, TimeUnit.MINUTES)
+            .build(new CacheLoader<>() {
+                @Nonnull
                 @Override
-                public SchemaInfo load(BytesSchemaVersion schemaVersion) 
throws Exception {
+                public CompletableFuture<SchemaInfo> load(@Nonnull 
BytesSchemaVersion schemaVersion) {
                     return loadSchema(schemaVersion);
                 }
             });
@@ -69,7 +72,7 @@ public class PulsarSqlSchemaInfoProvider implements 
SchemaInfoProvider {
             if (null == schemaVersion) {
                 return completedFuture(null);
             }
-            return 
completedFuture(cache.get(BytesSchemaVersion.of(schemaVersion)));
+            return cache.get(BytesSchemaVersion.of(schemaVersion));
         } catch (ExecutionException e) {
             LOG.error("Can't get generic schema for topic {} schema version 
{}",
                     topicName.toString(), new String(schemaVersion, 
StandardCharsets.UTF_8), e);
@@ -79,14 +82,7 @@ public class PulsarSqlSchemaInfoProvider implements 
SchemaInfoProvider {
 
     @Override
     public CompletableFuture<SchemaInfo> getLatestSchema() {
-        try {
-            return completedFuture(pulsarAdmin.schemas()
-                    .getSchemaInfo(topicName.toString()));
-        } catch (PulsarAdminException e) {
-            LOG.error("Can't get current schema for topic {}",
-                    topicName.toString(), e);
-            return FutureUtil.failedFuture(e.getCause());
-        }
+        return pulsarAdmin.schemas().getSchemaInfoAsync(topicName.toString());
     }
 
     @Override
@@ -94,24 +90,19 @@ public class PulsarSqlSchemaInfoProvider implements 
SchemaInfoProvider {
         return topicName.getLocalName();
     }
 
-    private SchemaInfo loadSchema(BytesSchemaVersion bytesSchemaVersion) 
throws PulsarAdminException {
+    private CompletableFuture<SchemaInfo> loadSchema(BytesSchemaVersion 
bytesSchemaVersion) {
         ClassLoader originalContextLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             
Thread.currentThread().setContextClassLoader(InjectionManagerFactory.class.getClassLoader());
             long version = ByteBuffer.wrap(bytesSchemaVersion.get()).getLong();
-            SchemaInfo schemaInfo = 
pulsarAdmin.schemas().getSchemaInfo(topicName.toString(), version);
-            if (schemaInfo == null) {
-                throw new RuntimeException(
-                        "The specific version (" + version + ") schema of the 
topic " + topicName + " is null");
-            }
-            return schemaInfo;
+            return 
pulsarAdmin.schemas().getSchemaInfoAsync(topicName.toString(), version);
         } finally {
             
Thread.currentThread().setContextClassLoader(originalContextLoader);
         }
     }
 
 
-    public static SchemaInfo defaultSchema(){
+    public static SchemaInfo defaultSchema() {
         return Schema.BYTES.getSchemaInfo();
     }
 
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 7eaa2da498f..40ced8e4f8e 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -18,6 +18,22 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.airlift.log.Logger;
 import io.netty.buffer.ByteBuf;
@@ -26,7 +42,18 @@ import io.trino.spi.type.DecimalType;
 import io.trino.spi.type.RowType;
 import io.trino.spi.type.Type;
 import io.trino.spi.type.VarcharType;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.math.BigDecimal;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import lombok.Data;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -57,34 +84,6 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
 public class TestPulsarRecordCursor extends TestPulsarConnector {
 
     private static final Logger log = Logger.get(TestPulsarRecordCursor.class);
@@ -490,8 +489,8 @@ public class TestPulsarRecordCursor extends 
TestPulsarConnector {
         // If the schemaVersion of the message is not null, try to get the 
schema.
         Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.AVRO);
         Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new 
LongSchemaVersion(0).bytes());
-        Mockito.when(schemas.getSchemaInfo(anyString(), eq(0L)))
-                .thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        Mockito.when(schemas.getSchemaInfoAsync(anyString(), eq(0L)))
+                
.thenReturn(CompletableFuture.completedFuture(Schema.AVRO(Foo.class).getSchemaInfo()));
         schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, 
pulsarSplit);
         assertEquals(SchemaType.AVRO, schemaInfo.getType());
 
@@ -516,19 +515,21 @@ public class TestPulsarRecordCursor extends 
TestPulsarConnector {
 
         // If the specific version schema is null, throw runtime exception.
         Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new 
LongSchemaVersion(1L).bytes());
-        Mockito.when(schemas.getSchemaInfo(schemaTopic, 1)).thenReturn(null);
+        Mockito.when(schemas.getSchemaInfoAsync(schemaTopic, 1))
+                .thenReturn(CompletableFuture.completedFuture(null));
         try {
             schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, 
pulsarSplit);
             fail("The specific version " + 1 + " schema is null, should 
fail.");
         } catch (InvocationTargetException e) {
             String schemaVersion = BytesSchemaVersion.of(new 
LongSchemaVersion(1L).bytes()).toString();
             assertTrue(e.getCause() instanceof RuntimeException);
-            assertTrue(e.getCause().getMessage().contains("schema of the topic 
" + schemaTopic + " is null"));
+            assertTrue(e.getCause().getMessage().contains("schema of the table 
" + topic + " is null"));
         }
 
         // Get the specific version schema.
         Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new 
LongSchemaVersion(2L).bytes());
-        Mockito.when(schemas.getSchemaInfo(schemaTopic, 
2)).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        Mockito.when(schemas.getSchemaInfoAsync(schemaTopic, 2))
+                
.thenReturn(CompletableFuture.completedFuture(Schema.AVRO(Foo.class).getSchemaInfo()));
         schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, 
pulsarSplit);
         assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
     }

Reply via email to