jerrypeng closed pull request #2515: support nested fields in Pulsar presto 
connector
URL: https://github.com/apache/incubator-pulsar/pull/2515
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
index 36d4dc8328..e5cb79c9a0 100644
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
+++ 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
@@ -82,13 +82,6 @@
         private Boolean defaultProfileImage;
     }
     @Data
-    public static class Url {
-        private String url;
-        private String expandedUrl;
-        private String displayUrl;
-        private List<Long> indices = null;
-    }
-    @Data
     public static class RetweetedStatus {
         private String createdAt;
         private Long id;
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index f8cec40e7b..402a36e4f4 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -55,9 +55,23 @@ public Object deserialize(byte[] bytes) {
     public Object extractField(int index, Object currentRecord) {
         try {
             GenericRecord record = (GenericRecord) currentRecord;
-            return 
record.get(this.columnHandles.get(index).getPositionIndex());
+            PulsarColumnHandle pulsarColumnHandle = 
this.columnHandles.get(index);
+            Integer[] positionIndices = 
pulsarColumnHandle.getPositionIndices();
+            Object curr = record.get(positionIndices[0]);
+            if (curr == null) {
+                return null;
+            }
+            if (positionIndices.length > 0) {
+                for (int i = 1 ; i < positionIndices.length; i++) {
+                    curr = ((GenericRecord) curr).get(positionIndices[i]);
+                    if (curr == null) {
+                        return null;
+                    }
+                }
+            }
+            return curr;
         } catch (Exception ex) {
-            log.error(ex);
+            log.debug(ex,"%s", ex);
         }
         return null;
     }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
index f72ad0efee..aed7e0af58 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
@@ -25,6 +25,7 @@
 import org.apache.pulsar.shade.com.google.gson.JsonObject;
 import org.apache.pulsar.shade.com.google.gson.JsonParser;
 
+import java.util.Arrays;
 import java.util.List;
 
 import static com.facebook.presto.spi.type.IntegerType.INTEGER;
@@ -54,10 +55,19 @@ public Object extractField(int index, Object currentRecord) 
{
         try {
             JsonObject jsonObject = (JsonObject) currentRecord;
             PulsarColumnHandle pulsarColumnHandle = columnHandles.get(index);
-            JsonElement field = jsonObject.get(pulsarColumnHandle.getName());
+
+            String[] fieldNames = pulsarColumnHandle.getFieldNames();
+            JsonElement field = jsonObject.get(fieldNames[0]);
             if (field.isJsonNull()) {
                 return null;
             }
+            for (int i = 1; i < fieldNames.length ; i++) {
+                field = field.getAsJsonObject().get(fieldNames[i]);
+                if (field.isJsonNull()) {
+                    return null;
+                }
+            }
+
             Type type = pulsarColumnHandle.getType();
             Class<?> javaType = type.getJavaType();
 
@@ -81,7 +91,7 @@ public Object extractField(int index, Object currentRecord) {
                 return null;
             }
         } catch (Exception ex) {
-            log.error(ex);
+            log.debug(ex,"%s", ex);
         }
         return null;
     }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
index def8cc39a5..f98e864f1a 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
@@ -24,9 +24,8 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.util.Objects;
+import java.util.Arrays;
 
-import static com.google.common.base.MoreObjects.toStringHelper;
 import static java.util.Objects.requireNonNull;
 
 public class PulsarColumnHandle implements ColumnHandle {
@@ -53,10 +52,9 @@
      */
     private final boolean internal;
 
-    /**
-     * The index of the field in the schema associated with this column.
-     */
-    private Integer positionIndex;
+    private final String[] fieldNames;
+
+    private final Integer[] positionIndices;
 
     @JsonCreator
     public PulsarColumnHandle(
@@ -65,13 +63,15 @@ public PulsarColumnHandle(
             @JsonProperty("type") Type type,
             @JsonProperty("hidden") boolean hidden,
             @JsonProperty("internal") boolean internal,
-            @JsonProperty("positionIndex") Integer positionIndex) {
+            @JsonProperty("fieldNames") String[] fieldNames,
+            @JsonProperty("positionIndices") Integer[] positionIndices) {
         this.connectorId = requireNonNull(connectorId, "connectorId is null");
         this.name = requireNonNull(name, "name is null");
         this.type = requireNonNull(type, "type is null");
         this.hidden = hidden;
         this.internal = internal;
-        this.positionIndex = positionIndex;
+        this.fieldNames = fieldNames;
+        this.positionIndices = positionIndices;
     }
 
     @JsonProperty
@@ -100,8 +100,13 @@ public boolean isInternal() {
     }
 
     @JsonProperty
-    public Integer getPositionIndex() {
-        return positionIndex;
+    public String[] getFieldNames() {
+        return fieldNames;
+    }
+
+    @JsonProperty
+    public Integer[] getPositionIndices() {
+        return positionIndices;
     }
 
 
@@ -110,37 +115,43 @@ ColumnMetadata getColumnMetadata() {
     }
 
     @Override
-    public int hashCode() {
-        return Objects.hash(connectorId, name, type, hidden, internal);
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        PulsarColumnHandle that = (PulsarColumnHandle) o;
+
+        if (hidden != that.hidden) return false;
+        if (internal != that.internal) return false;
+        if (connectorId != null ? !connectorId.equals(that.connectorId) : 
that.connectorId != null) return false;
+        if (name != null ? !name.equals(that.name) : that.name != null) return 
false;
+        if (type != null ? !type.equals(that.type) : that.type != null) return 
false;
+        if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
+        return Arrays.deepEquals(positionIndices, that.positionIndices);
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-
-        PulsarColumnHandle other = (PulsarColumnHandle) obj;
-        return Objects.equals(this.connectorId, other.connectorId) &&
-                Objects.equals(this.name, other.name) &&
-                Objects.equals(this.type, other.type) &&
-                Objects.equals(this.hidden, other.hidden) &&
-                Objects.equals(this.internal, other.internal) &&
-                Objects.equals(this.positionIndex, other.positionIndex);
+    public int hashCode() {
+        int result = connectorId != null ? connectorId.hashCode() : 0;
+        result = 31 * result + (name != null ? name.hashCode() : 0);
+        result = 31 * result + (type != null ? type.hashCode() : 0);
+        result = 31 * result + (hidden ? 1 : 0);
+        result = 31 * result + (internal ? 1 : 0);
+        result = 31 * result + Arrays.hashCode(fieldNames);
+        result = 31 * result + Arrays.hashCode(positionIndices);
+        return result;
     }
 
     @Override
     public String toString() {
-        return toStringHelper(this)
-                .add("connectorId", connectorId)
-                .add("name", name)
-                .add("type", type)
-                .add("hidden", hidden)
-                .add("internal", internal)
-                .add("positionIndex", positionIndex)
-                .toString();
+        return "PulsarColumnHandle{" +
+                "connectorId='" + connectorId + '\'' +
+                ", name='" + name + '\'' +
+                ", type=" + type +
+                ", hidden=" + hidden +
+                ", internal=" + internal +
+                ", fieldNames=" + Arrays.toString(fieldNames) +
+                ", positionIndices=" + Arrays.toString(positionIndices) +
+                '}';
     }
 }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
index 2e23c8788a..9a484ba1a2 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
@@ -21,19 +21,25 @@
 import com.facebook.presto.spi.ColumnMetadata;
 import com.facebook.presto.spi.type.Type;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class PulsarColumnMetadata extends ColumnMetadata {
 
     private boolean isInternal;
-    private Integer positionIndex;
     // need this because presto ColumnMetadata saves name in lowercase
     private String nameWithCase;
+    private String[] fieldNames;
+    private Integer[] positionIndices;
 
     public PulsarColumnMetadata(String name, Type type, String comment, String 
extraInfo,
-                                boolean hidden, boolean isInternal, Integer 
positionIndex) {
+                                boolean hidden, boolean isInternal,
+                                String[] fieldNames, Integer[] 
positionIndices) {
         super(name, type, comment, extraInfo, hidden);
         this.nameWithCase = name;
         this.isInternal = isInternal;
-        this.positionIndex = positionIndex;
+        this.fieldNames = fieldNames;
+        this.positionIndices = positionIndices;
     }
 
     public String getNameWithCase() {
@@ -44,30 +50,22 @@ public boolean isInternal() {
         return isInternal;
     }
 
-    public int getPositionIndex() {
-        return positionIndex;
+    public String[] getFieldNames() {
+        return fieldNames;
     }
 
+    public Integer[] getPositionIndices() {
+        return positionIndices;
+    }
 
     @Override
     public String toString() {
-        StringBuilder sb = new StringBuilder("PulsarColumnMetadata{");
-        sb.append("name='").append(getName()).append('\'');
-        sb.append(", type=").append(getType());
-        if (getComment() != null) {
-            sb.append(", comment='").append(getComment()).append('\'');
-        }
-        if (getExtraInfo() != null) {
-            sb.append(", extraInfo='").append(getExtraInfo()).append('\'');
-        }
-        if (isHidden()) {
-            sb.append(", hidden");
-        }
-        if (isInternal()) {
-            sb.append(", internal");
-        }
-        sb.append('}');
-        return sb.toString();
+        return "PulsarColumnMetadata{" +
+                "isInternal=" + isInternal +
+                ", nameWithCase='" + nameWithCase + '\'' +
+                ", fieldNames=" + Arrays.toString(fieldNames) +
+                ", positionIndices=" + Arrays.toString(positionIndices) +
+                '}';
     }
 
     @Override
@@ -78,13 +76,19 @@ public boolean equals(Object o) {
 
         PulsarColumnMetadata that = (PulsarColumnMetadata) o;
 
-        return isInternal == that.isInternal;
+        if (isInternal != that.isInternal) return false;
+        if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : 
that.nameWithCase != null) return false;
+        if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
+        return Arrays.deepEquals(positionIndices, that.positionIndices);
     }
 
     @Override
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + (isInternal ? 1 : 0);
+        result = 31 * result + (nameWithCase != null ? nameWithCase.hashCode() 
: 0);
+        result = 31 * result + Arrays.hashCode(fieldNames);
+        result = 31 * result + Arrays.hashCode(positionIndices);
         return result;
     }
 }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index 8341775971..76da585c5b 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -179,11 +179,11 @@ PulsarColumnHandle getColumnHandle(String connectorId, 
boolean hidden) {
                 getName(),
                 getType(),
                 hidden,
-                true, null);
+                true, null, null);
     }
 
     PulsarColumnMetadata getColumnMetadata(boolean hidden) {
-        return new PulsarColumnMetadata(name, type, comment, null, hidden, 
true, null);
+        return new PulsarColumnMetadata(name, type, comment, null, hidden, 
true, null, null);
     }
 
     public static Set<PulsarInternalColumn> getInternalFields() {
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 6e30ea5264..f283ea24c2 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -32,14 +32,11 @@
 import com.facebook.presto.spi.SchemaTablePrefix;
 import com.facebook.presto.spi.TableNotFoundException;
 import com.facebook.presto.spi.connector.ConnectorMetadata;
-import com.facebook.presto.spi.predicate.Domain;
-import com.facebook.presto.spi.predicate.Range;
 import com.facebook.presto.spi.type.BigintType;
 import com.facebook.presto.spi.type.BooleanType;
 import com.facebook.presto.spi.type.DoubleType;
 import com.facebook.presto.spi.type.IntegerType;
 import com.facebook.presto.spi.type.RealType;
-import com.facebook.presto.spi.type.SqlTimestampWithTimeZone;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarbinaryType;
 import com.facebook.presto.spi.type.VarcharType;
@@ -60,23 +57,23 @@
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 import javax.inject.Inject;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.Stack;
 import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
-import static 
org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
-import static 
org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
 import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.facebook.presto.spi.type.DateType.DATE;
 import static com.facebook.presto.spi.type.TimeType.TIME;
 import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
+import static 
org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
 
 public class PulsarMetadata implements ConnectorMetadata {
 
@@ -183,7 +180,8 @@ public void accept(ColumnMetadata columnMetadata) {
                         pulsarColumnMetadata.getType(),
                         pulsarColumnMetadata.isHidden(),
                         pulsarColumnMetadata.isInternal(),
-                        pulsarColumnMetadata.getPositionIndex());
+                        pulsarColumnMetadata.getFieldNames(),
+                        pulsarColumnMetadata.getPositionIndices());
 
                 columnHandles.put(
                         columnMetadata.getName(),
@@ -289,11 +287,8 @@ private ConnectorTableMetadata 
getTableMetadata(SchemaTableName schemaTableName,
 
         ImmutableList.Builder<ColumnMetadata> builder = 
ImmutableList.builder();
 
-        List<Schema.Field> fields = schema.getFields();
-        for (int i = 0; i < fields.size(); i++) {
-            Schema.Field field = fields.get(i);
-            builder.addAll(getColumns(field.name(), field.schema(), i));
-        }
+        builder.addAll(getColumns(null, schema, new HashSet<>(), new 
Stack<>(), new Stack<>()));
+
         if (withInternalColumns) {
             PulsarInternalColumn.getInternalFields().stream().forEach(new 
Consumer<PulsarInternalColumn>() {
                 @Override
@@ -306,15 +301,21 @@ public void accept(PulsarInternalColumn 
pulsarInternalColumn) {
         return new ConnectorTableMetadata(schemaTableName, builder.build());
     }
 
-    // TODO support nested fields
-    private List<PulsarColumnMetadata> getColumns(String name, Schema 
fieldSchema, int index) {
+
+    @VisibleForTesting
+    static List<PulsarColumnMetadata> getColumns(String fieldName, Schema 
fieldSchema,
+                                                  Set<String> fieldTypes,
+                                                  Stack<String> fieldNames,
+                                                  Stack<Integer> 
positionIndices) {
 
         List<PulsarColumnMetadata> columnMetadataList = new LinkedList<>();
 
         if (isPrimitiveType(fieldSchema.getType())) {
-            columnMetadataList.add(new PulsarColumnMetadata(name,
+            columnMetadataList.add(new PulsarColumnMetadata(fieldName,
                     convertType(fieldSchema.getType(), 
fieldSchema.getLogicalType()),
-                    null, null, false, false, index));
+                    null, null, false, false,
+                    fieldNames.toArray(new String[fieldNames.size()]),
+                    positionIndices.toArray(new 
Integer[positionIndices.size()])));
         } else if (fieldSchema.getType() == Schema.Type.UNION) {
             boolean canBeNull = false;
             for (Schema type : fieldSchema.getTypes()) {
@@ -322,22 +323,53 @@ public void accept(PulsarInternalColumn 
pulsarInternalColumn) {
                     PulsarColumnMetadata columnMetadata;
                     if (type.getType() != Schema.Type.NULL) {
                         if (!canBeNull) {
-                            columnMetadata = new PulsarColumnMetadata(name,
+                            columnMetadata = new 
PulsarColumnMetadata(fieldName,
                                     convertType(type.getType(), 
type.getLogicalType()),
-                                    null, null, false, false, index);
+                                    null, null, false, false,
+                                    fieldNames.toArray(new 
String[fieldNames.size()]),
+                                    positionIndices.toArray(new 
Integer[positionIndices.size()]));
                         } else {
-                            columnMetadata = new PulsarColumnMetadata(name,
+                            columnMetadata = new 
PulsarColumnMetadata(fieldName,
                                     convertType(type.getType(), 
type.getLogicalType()),
-                                    "field can be null", null, false, false, 
index);
+                                    "field can be null", null, false, false,
+                                    fieldNames.toArray(new 
String[fieldNames.size()]),
+                                    positionIndices.toArray(new 
Integer[positionIndices.size()]));
                         }
                         columnMetadataList.add(columnMetadata);
                     } else {
                         canBeNull = true;
                     }
+                } else {
+                    List<PulsarColumnMetadata> columns = getColumns(fieldName, 
type, fieldTypes, fieldNames, positionIndices);
+                    columnMetadataList.addAll(columns);
+
                 }
             }
         } else if (fieldSchema.getType() == Schema.Type.RECORD) {
+            // check if we have seen this type before to prevent cyclic class 
definitions.
+            if (!fieldTypes.contains(fieldSchema.getFullName())) {
+                // add to types seen so far in traversal
+                fieldTypes.add(fieldSchema.getFullName());
+                List<Schema.Field> fields = fieldSchema.getFields();
+                for (int i = 0; i < fields.size(); i++) {
+                    Schema.Field field = fields.get(i);
+                    fieldNames.push(field.name());
+                    positionIndices.push(i);
+                    List<PulsarColumnMetadata> columns;
+                    if (fieldName == null) {
+                        columns = getColumns(field.name(), field.schema(), 
fieldTypes, fieldNames, positionIndices);
+                    } else {
+                        columns = getColumns(String.format("%s.%s", fieldName, 
field.name()), field.schema(), fieldTypes, fieldNames, positionIndices);
 
+                    }
+                    positionIndices.pop();
+                    fieldNames.pop();
+                    columnMetadataList.addAll(columns);
+                }
+                fieldTypes.remove(fieldSchema.getFullName());
+            } else {
+                log.debug("Already seen type: %s", fieldSchema.getFullName());
+            }
         } else if (fieldSchema.getType() == Schema.Type.ARRAY) {
 
         } else if (fieldSchema.getType() == Schema.Type.MAP) {
@@ -383,7 +415,8 @@ static Type convertType(Schema.Type avroType, LogicalType 
logicalType) {
         }
     }
 
-    private boolean isPrimitiveType(Schema.Type type) {
+    @VisibleForTesting
+    static boolean isPrimitiveType(Schema.Type type) {
         return Schema.Type.NULL == type
                 || Schema.Type.BOOLEAN == type
                 || Schema.Type.INT == type
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 8593c55ccf..ef56e6ccc1 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -20,7 +20,6 @@
 
 import com.facebook.presto.spi.PrestoException;
 import com.facebook.presto.spi.RecordCursor;
-import com.facebook.presto.spi.type.TimestampWithTimeZoneType;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarbinaryType;
 import com.facebook.presto.spi.type.VarcharType;
@@ -45,19 +44,14 @@
 import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
 
 import java.io.IOException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.TimeUnit;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
 import static 
com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
-import static com.facebook.presto.spi.type.DateTimeEncoding.unpackMillisUtc;
 import static com.facebook.presto.spi.type.DateType.DATE;
 import static com.facebook.presto.spi.type.IntegerType.INTEGER;
 import static com.facebook.presto.spi.type.RealType.REAL;
@@ -67,7 +61,6 @@
 import static 
com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
 import static com.facebook.presto.spi.type.TinyintType.TINYINT;
 import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 public class PulsarRecordCursor implements RecordCursor {
 
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 4da7ea57f5..0882efc4be 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -59,11 +59,11 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
 import java.time.LocalDate;
 import java.time.LocalTime;
 import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -80,7 +80,6 @@
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -111,6 +110,7 @@
     protected static Map<String, SchemaInfo> topicsToSchemas;
     protected static Map<String, Long> topicsToNumEntries;
 
+
     protected static final NamespaceName NAMESPACE_NAME_1 = 
NamespaceName.get("tenant-1", "ns-1");
     protected static final NamespaceName NAMESPACE_NAME_2 = 
NamespaceName.get("tenant-1", "ns-2");
     protected static final NamespaceName NAMESPACE_NAME_3 = 
NamespaceName.get("tenant-2", "ns-1");
@@ -136,195 +136,440 @@
     protected static final TopicName PARTITIONED_TOPIC_6 = 
TopicName.get("persistent", NAMESPACE_NAME_4,
             "partitioned-topic-2");
 
+
     public static class Foo {
-        int field1;
-        String field2;
-        float field3;
-        double field4;
-        boolean field5;
-        long field6;
+        public static class Bar {
+            public int field1;
+        }
+
+        public int field1;
+        public String field2;
+        public float field3;
+        public double field4;
+        public boolean field5;
+        public long field6;
         @org.apache.avro.reflect.AvroSchema("{ \"type\": \"long\", 
\"logicalType\": \"timestamp-millis\" }")
-        protected long timestamp;
+        public long timestamp;
         @org.apache.avro.reflect.AvroSchema("{ \"type\": \"int\", 
\"logicalType\": \"time-millis\" }")
-        protected int time;
+        public int time;
         @org.apache.avro.reflect.AvroSchema("{ \"type\": \"int\", 
\"logicalType\": \"date\" }")
-        protected int date;
+        public int date;
+        public TestPulsarConnector.Bar bar;
+    }
 
-        public int getField1() {
-            return field1;
-        }
+    public static class Bar {
+        public Integer field1;
+        public String field2;
+        public Boo test;
+        public float field3;
+        public Boo test2;
+    }
 
-        public void setField1(int field1) {
-            this.field1 = field1;
-        }
+    public static class Boo {
+        public Double field4;
+        public Boolean field5;
+        public long field6;
+        // for test cyclic definitions
+        public Foo foo;
+        public Boo boo;
+        public Bar bar;
+        // different namespace with same classname should work though
+        public Foo.Bar foobar;
+    }
 
-        public String getField2() {
-            return field2;
-        }
+    protected static Map<String, Type> fooTypes;
+    protected static List<PulsarColumnHandle> fooColumnHandles;
+    protected static Map<TopicName, PulsarSplit> splits;
+    protected static Map<String, String[]> fooFieldNames;
+    protected static Map<String, Integer[]> fooPositionIndices;
+    protected static Map<String, Function<Integer, Object>> fooFunctions;
 
-        public void setField2(String field2) {
-            this.field2 = field2;
-        }
+    static {
+        try {
+            topicNames = new LinkedList<>();
+            topicNames.add(TOPIC_1);
+            topicNames.add(TOPIC_2);
+            topicNames.add(TOPIC_3);
+            topicNames.add(TOPIC_4);
+            topicNames.add(TOPIC_5);
+            topicNames.add(TOPIC_6);
+
+            partitionedTopicNames = new LinkedList<>();
+            partitionedTopicNames.add(PARTITIONED_TOPIC_1);
+            partitionedTopicNames.add(PARTITIONED_TOPIC_2);
+            partitionedTopicNames.add(PARTITIONED_TOPIC_3);
+            partitionedTopicNames.add(PARTITIONED_TOPIC_4);
+            partitionedTopicNames.add(PARTITIONED_TOPIC_5);
+            partitionedTopicNames.add(PARTITIONED_TOPIC_6);
+
+            partitionedTopicsToPartitions = new HashMap<>();
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_1.toString(), 
2);
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_2.toString(), 
3);
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_3.toString(), 
4);
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_4.toString(), 
5);
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_5.toString(), 
6);
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 
7);
+
+            topicsToSchemas = new HashMap<>();
+            topicsToSchemas.put(TOPIC_1.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_2.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_3.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_4.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_5.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_6.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+
+
+            topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+
+            topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+
+            fooTypes = new HashMap<>();
+            fooTypes.put("field1", IntegerType.INTEGER);
+            fooTypes.put("field2", VarcharType.VARCHAR);
+            fooTypes.put("field3", RealType.REAL);
+            fooTypes.put("field4", DoubleType.DOUBLE);
+            fooTypes.put("field5", BooleanType.BOOLEAN);
+            fooTypes.put("field6", BigintType.BIGINT);
+            fooTypes.put("timestamp", TIMESTAMP);
+            fooTypes.put("time", TIME);
+            fooTypes.put("date", DATE);
+            fooTypes.put("bar.field1", IntegerType.INTEGER);
+            fooTypes.put("bar.field2", VarcharType.VARCHAR);
+            fooTypes.put("bar.test.field4", DoubleType.DOUBLE);
+            fooTypes.put("bar.test.field5", BooleanType.BOOLEAN);
+            fooTypes.put("bar.test.field6", BigintType.BIGINT);
+            fooTypes.put("bar.test.foobar.field1", IntegerType.INTEGER);
+            fooTypes.put("bar.field3", RealType.REAL);
+            fooTypes.put("bar.test2.field4", DoubleType.DOUBLE);
+            fooTypes.put("bar.test2.field5", BooleanType.BOOLEAN);
+            fooTypes.put("bar.test2.field6", BigintType.BIGINT);
+            fooTypes.put("bar.test2.foobar.field1", IntegerType.INTEGER);
+
+            topicsToNumEntries = new HashMap<>();
+            topicsToNumEntries.put(TOPIC_1.getSchemaName(), 1233L);
+            topicsToNumEntries.put(TOPIC_2.getSchemaName(), 0L);
+            topicsToNumEntries.put(TOPIC_3.getSchemaName(), 100L);
+            topicsToNumEntries.put(TOPIC_4.getSchemaName(), 12345L);
+            topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
+            topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_4.getSchemaName(), 0L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_5.getSchemaName(), 800L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_6.getSchemaName(), 1L);
+
+            fooFieldNames = new HashMap<>();
+            fooPositionIndices = new HashMap<>();
+            fooColumnHandles = new LinkedList<>();
+
+            String[] fieldNames1 = {"field1"};
+            Integer[] positionIndices1 = {0};
+            fooFieldNames.put("field1", fieldNames1);
+            fooPositionIndices.put("field1", positionIndices1);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field1",
+                    fooTypes.get("field1"),
+                    false,
+                    false,
+                    fooFieldNames.get("field1"),
+                    fooPositionIndices.get("field1")));
 
-        public float getField3() {
-            return field3;
-        }
 
-        public void setField3(float field3) {
-            this.field3 = field3;
-        }
+            String[] fieldNames2 = {"field2"};
+            Integer[] positionIndices2 = {1};
+            fooFieldNames.put("field2", fieldNames2);
+            fooPositionIndices.put("field2", positionIndices2);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field2",
+                    fooTypes.get("field2"),
+                    false,
+                    false,
+                    fieldNames2,
+                    positionIndices2));
 
-        public double getField4() {
-            return field4;
-        }
+            String[] fieldNames3 = {"field3"};
+            Integer[] positionIndices3 = {2};
+            fooFieldNames.put("field3", fieldNames3);
+            fooPositionIndices.put("field3", positionIndices3);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field3",
+                    fooTypes.get("field3"),
+                    false,
+                    false,
+                    fieldNames3,
+                    positionIndices3));
 
-        public void setField4(double field4) {
-            this.field4 = field4;
-        }
+            String[] fieldNames4 = {"field4"};
+            Integer[] positionIndices4 = {3};
+            fooFieldNames.put("field4", fieldNames4);
+            fooPositionIndices.put("field4", positionIndices4);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field4",
+                    fooTypes.get("field4"),
+                    false,
+                    false,
+                    fieldNames4,
+                    positionIndices4));
 
-        public boolean isField5() {
-            return field5;
-        }
 
-        public void setField5(boolean field5) {
-            this.field5 = field5;
-        }
+            String[] fieldNames5 = {"field5"};
+            Integer[] positionIndices5 = {4};
+            fooFieldNames.put("field5", fieldNames5);
+            fooPositionIndices.put("field5", positionIndices5);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field5",
+                    fooTypes.get("field5"),
+                    false,
+                    false,
+                    fieldNames5,
+                    positionIndices5));
 
-        public long getField6() {
-            return field6;
-        }
+            String[] fieldNames6 = {"field6"};
+            Integer[] positionIndices6 = {5};
+            fooFieldNames.put("field6", fieldNames6);
+            fooPositionIndices.put("field6", positionIndices6);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field6",
+                    fooTypes.get("field6"),
+                    false,
+                    false,
+                    fieldNames6,
+                    positionIndices6));
 
-        public void setField6(long field6) {
-            this.field6 = field6;
-        }
+            String[] fieldNames7 = {"timestamp"};
+            Integer[] positionIndices7 = {6};
+            fooFieldNames.put("timestamp", fieldNames7);
+            fooPositionIndices.put("timestamp", positionIndices7);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "timestamp",
+                    fooTypes.get("timestamp"),
+                    false,
+                    false,
+                    fieldNames7,
+                    positionIndices7));
 
-        public long getTimestamp() {
-            return timestamp;
-        }
+            String[] fieldNames8 = {"time"};
+            Integer[] positionIndices8 = {7};
+            fooFieldNames.put("time", fieldNames8);
+            fooPositionIndices.put("time", positionIndices8);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "time",
+                    fooTypes.get("time"),
+                    false,
+                    false,
+                    fieldNames8,
+                    positionIndices8));
 
-        public void setTimestamp(long timestamp) {
-            this.timestamp = timestamp;
-        }
+            String[] fieldNames9 = {"date"};
+            Integer[] positionIndices9 = {8};
+            fooFieldNames.put("date", fieldNames9);
+            fooPositionIndices.put("date", positionIndices9);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "date",
+                    fooTypes.get("date"),
+                    false,
+                    false,
+                    fieldNames9,
+                    positionIndices9));
 
-        public int getTime() {
-            return time;
-        }
+            String[] bar_fieldNames1 = {"bar", "field1"};
+            Integer[] bar_positionIndices1 = {9, 0};
+            fooFieldNames.put("bar.field1", bar_fieldNames1);
+            fooPositionIndices.put("bar.field1", bar_positionIndices1);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.field1",
+                    fooTypes.get("bar.field1"),
+                    false,
+                    false,
+                    bar_fieldNames1,
+                    bar_positionIndices1));
 
-        public void setTime(int time) {
-            this.time = time;
-        }
+            String[] bar_fieldNames2 = {"bar", "field2"};
+            Integer[] bar_positionIndices2 = {9, 1};
+            fooFieldNames.put("bar.field2", bar_fieldNames2);
+            fooPositionIndices.put("bar.field2", bar_positionIndices2);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.field2",
+                    fooTypes.get("bar.field2"),
+                    false,
+                    false,
+                    bar_fieldNames2,
+                    bar_positionIndices2));
 
-        public int getDate() {
-            return date;
-        }
+            String[] bar_test_fieldNames4 = {"bar", "test", "field4"};
+            Integer[] bar_test_positionIndices4 = {9, 2, 0};
+            fooFieldNames.put("bar.test.field4", bar_test_fieldNames4);
+            fooPositionIndices.put("bar.test.field4", 
bar_test_positionIndices4);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test.field4",
+                    fooTypes.get("bar.test.field4"),
+                    false,
+                    false,
+                    bar_test_fieldNames4,
+                    bar_test_positionIndices4));
 
-        public void setDate(int date) {
-            this.date = date;
-        }
-    }
+            String[] bar_test_fieldNames5 = {"bar", "test", "field5"};
+            Integer[] bar_test_positionIndices5 = {9, 2, 1};
+            fooFieldNames.put("bar.test.field5", bar_test_fieldNames5);
+            fooPositionIndices.put("bar.test.field5", 
bar_test_positionIndices5);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test.field5",
+                    fooTypes.get("bar.test.field5"),
+                    false,
+                    false,
+                    bar_test_fieldNames5,
+                    bar_test_positionIndices5));
 
-    protected static Map<String, Type> fooTypes;
-    protected static List<PulsarColumnHandle> fooColumnHandles;
-    protected static Map<TopicName, PulsarSplit> splits;
+            String[] bar_test_fieldNames6 = {"bar", "test", "field6"};
+            Integer[] bar_test_positionIndices6 = {9, 2, 2};
+            fooFieldNames.put("bar.test.field6", bar_test_fieldNames6);
+            fooPositionIndices.put("bar.test.field6", 
bar_test_positionIndices6);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test.field6",
+                    fooTypes.get("bar.test.field6"),
+                    false,
+                    false,
+                    bar_test_fieldNames6,
+                    bar_test_positionIndices6));
 
-    static {
-        topicNames = new LinkedList<>();
-        topicNames.add(TOPIC_1);
-        topicNames.add(TOPIC_2);
-        topicNames.add(TOPIC_3);
-        topicNames.add(TOPIC_4);
-        topicNames.add(TOPIC_5);
-        topicNames.add(TOPIC_6);
-
-        partitionedTopicNames = new LinkedList<>();
-        partitionedTopicNames.add(PARTITIONED_TOPIC_1);
-        partitionedTopicNames.add(PARTITIONED_TOPIC_2);
-        partitionedTopicNames.add(PARTITIONED_TOPIC_3);
-        partitionedTopicNames.add(PARTITIONED_TOPIC_4);
-        partitionedTopicNames.add(PARTITIONED_TOPIC_5);
-        partitionedTopicNames.add(PARTITIONED_TOPIC_6);
-
-        partitionedTopicsToPartitions = new HashMap<>();
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_1.toString(), 2);
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_2.toString(), 3);
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_3.toString(), 4);
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_4.toString(), 5);
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_5.toString(), 6);
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 7);
-
-        topicsToSchemas = new HashMap<>();
-        topicsToSchemas.put(TOPIC_1.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(TOPIC_2.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(TOPIC_3.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(TOPIC_4.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(TOPIC_5.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(TOPIC_6.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-
-
-        topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), 
AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), 
JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-
-        fooTypes = new HashMap<>();
-        fooTypes.put("field1", IntegerType.INTEGER);
-        fooTypes.put("field2", VarcharType.VARCHAR);
-        fooTypes.put("field3", RealType.REAL);
-        fooTypes.put("field4", DoubleType.DOUBLE);
-        fooTypes.put("field5", BooleanType.BOOLEAN);
-        fooTypes.put("field6", BigintType.BIGINT);
-        fooTypes.put("timestamp", TIMESTAMP);
-        fooTypes.put("time", TIME);
-        fooTypes.put("date", DATE);
-
-        topicsToNumEntries = new HashMap<>();
-        topicsToNumEntries.put(TOPIC_1.getSchemaName(), 1233L);
-        topicsToNumEntries.put(TOPIC_2.getSchemaName(), 0L);
-        topicsToNumEntries.put(TOPIC_3.getSchemaName(), 100L);
-        topicsToNumEntries.put(TOPIC_4.getSchemaName(), 12345L);
-        topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
-        topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_4.getSchemaName(), 0L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_5.getSchemaName(), 800L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_6.getSchemaName(), 1L);
-
-        fooColumnHandles = new LinkedList<>();
-        for (int i = 0; i < Foo.class.getDeclaredFields().length; i++) {
-            Field field = Foo.class.getDeclaredFields()[i];
+            String[] bar_test_foobar_fieldNames1 = {"bar", "test", "foobar", 
"field1"};
+            Integer[] bar_test_foobar_positionIndices1 = {9, 2, 6, 0};
+            fooFieldNames.put("bar.test.foobar.field1", 
bar_test_foobar_fieldNames1);
+            fooPositionIndices.put("bar.test.foobar.field1", 
bar_test_foobar_positionIndices1);
             fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
-                    field.getName(),
-                    fooTypes.get(field.getName()),
+                    "bar.test.foobar.field1",
+                    fooTypes.get("bar.test.foobar.field1"),
                     false,
                     false,
-                    i));
-        }
-        
fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream().map(
-                new Function<PulsarInternalColumn, PulsarColumnHandle>() {
-                    @Override
-                    public PulsarColumnHandle apply(PulsarInternalColumn 
pulsarInternalColumn) {
-                        return 
pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false);
-                    }
-                }).collect(Collectors.toList()));
-
-        splits = new HashMap<>();
-
-        List<TopicName> allTopics = new LinkedList<>();
-        allTopics.addAll(topicNames);
-        allTopics.addAll(partitionedTopicNames);
-
-        for (TopicName topicName : allTopics) {
-            splits.put(topicName, new PulsarSplit(0, 
pulsarConnectorId.toString(),
-                    topicName.getNamespace(), topicName.getLocalName(),
-                    topicsToNumEntries.get(topicName.getSchemaName()),
-                    new 
String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
-                    topicsToSchemas.get(topicName.getSchemaName()).getType(),
-                    0, topicsToNumEntries.get(topicName.getSchemaName()),
-                    0, 0, TupleDomain.all()));
+                    bar_test_foobar_fieldNames1,
+                    bar_test_foobar_positionIndices1));
+
+            String[] bar_field3 = {"bar", "field3"};
+            Integer[] bar_positionIndices3 = {9, 3};
+            fooFieldNames.put("bar.field3", bar_field3);
+            fooPositionIndices.put("bar.field3", bar_positionIndices3);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.field3",
+                    fooTypes.get("bar.field3"),
+                    false,
+                    false,
+                    bar_field3,
+                    bar_positionIndices3));
+
+            String[] bar_test2_fieldNames4 = {"bar", "test2", "field4"};
+            Integer[] bar_test2_positionIndices4 = {9, 4, 0};
+            fooFieldNames.put("bar.test2.field4", bar_test2_fieldNames4);
+            fooPositionIndices.put("bar.test2.field4", 
bar_test2_positionIndices4);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test2.field4",
+                    fooTypes.get("bar.test2.field4"),
+                    false,
+                    false,
+                    bar_test2_fieldNames4,
+                    bar_test2_positionIndices4));
+
+            String[] bar_test2_fieldNames5 = {"bar", "test2", "field5"};
+            Integer[] bar_test2_positionIndices5 = {9, 4, 1};
+            fooFieldNames.put("bar.test2.field5", bar_test2_fieldNames5);
+            fooPositionIndices.put("bar.test2.field5", 
bar_test2_positionIndices5);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test2.field5",
+                    fooTypes.get("bar.test2.field5"),
+                    false,
+                    false,
+                    bar_test2_fieldNames5,
+                    bar_test2_positionIndices5));
+
+            String[] bar_test2_fieldNames6 = {"bar", "test2", "field6"};
+            Integer[] bar_test2_positionIndices6 = {9, 4, 2};
+            fooFieldNames.put("bar.test2.field6", bar_test2_fieldNames6);
+            fooPositionIndices.put("bar.test2.field6", 
bar_test2_positionIndices6);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test2.field6",
+                    fooTypes.get("bar.test2.field6"),
+                    false,
+                    false,
+                    bar_test2_fieldNames6,
+                    bar_test2_positionIndices6));
+
+            String[] bar_test2_foobar_fieldNames1 = {"bar", "test2", "foobar", 
"field1"};
+            Integer[] bar_test2_foobar_positionIndices1 = {9, 4, 6, 0};
+            fooFieldNames.put("bar.test2.foobar.field1", 
bar_test2_foobar_fieldNames1);
+            fooPositionIndices.put("bar.test2.foobar.field1", 
bar_test2_foobar_positionIndices1);
+            fooColumnHandles.add(new 
PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test2.foobar.field1",
+                    fooTypes.get("bar.test2.foobar.field1"),
+                    false,
+                    false,
+                    bar_test2_foobar_fieldNames1,
+                    bar_test2_foobar_positionIndices1));
+
+            
fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream().map(
+                    new Function<PulsarInternalColumn, PulsarColumnHandle>() {
+                        @Override
+                        public PulsarColumnHandle apply(PulsarInternalColumn 
pulsarInternalColumn) {
+                            return 
pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false);
+                        }
+                    }).collect(Collectors.toList()));
+
+            splits = new HashMap<>();
+
+            List<TopicName> allTopics = new LinkedList<>();
+            allTopics.addAll(topicNames);
+            allTopics.addAll(partitionedTopicNames);
+
+            for (TopicName topicName : allTopics) {
+                splits.put(topicName, new PulsarSplit(0, 
pulsarConnectorId.toString(),
+                        topicName.getNamespace(), topicName.getLocalName(),
+                        topicsToNumEntries.get(topicName.getSchemaName()),
+                        new 
String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
+                        
topicsToSchemas.get(topicName.getSchemaName()).getType(),
+                        0, topicsToNumEntries.get(topicName.getSchemaName()),
+                        0, 0, TupleDomain.all()));
+            }
+
+            fooFunctions = new HashMap<>();
+
+            fooFunctions.put("field1", integer -> integer);
+            fooFunctions.put("field2", integer -> String.valueOf(integer));
+            fooFunctions.put("field3", integer -> integer.floatValue());
+            fooFunctions.put("field4", integer -> integer.doubleValue());
+            fooFunctions.put("field5", integer -> integer % 2 == 0);
+            fooFunctions.put("field6", integer -> integer.longValue());
+            fooFunctions.put("timestamp", integer -> 
System.currentTimeMillis());
+            fooFunctions.put("time", integer -> {
+                LocalTime now = LocalTime.now(ZoneId.systemDefault());
+                return now.toSecondOfDay() * 1000;
+            });
+            fooFunctions.put("date", integer -> {
+                LocalDate localDate = LocalDate.now();
+                LocalDate epoch = LocalDate.ofEpochDay(0);
+                return Math.toIntExact(ChronoUnit.DAYS.between(epoch, 
localDate));
+            });
+            fooFunctions.put("bar.field1", integer -> integer % 3 == 0 ? null 
: integer + 1);
+            fooFunctions.put("bar.field2", integer -> integer % 2 == 0 ? null 
: String.valueOf(integer + 2));
+            fooFunctions.put("bar.field3", integer -> integer + 3.0f);
+
+            fooFunctions.put("bar.test.field4", integer -> integer + 1.0);
+            fooFunctions.put("bar.test.field5", integer -> (integer + 1) % 2 
== 0);
+            fooFunctions.put("bar.test.field6", integer -> integer + 10L);
+            fooFunctions.put("bar.test.foobar.field1", integer -> integer % 3);
+
+            fooFunctions.put("bar.test2.field4", integer -> integer + 2.0);
+            fooFunctions.put("bar.test2.field5", integer -> (integer + 1) % 32 
== 0);
+            fooFunctions.put("bar.test2.field6", integer -> integer + 15L);
+            fooFunctions.put("bar.test2.foobar.field1", integer -> integer % 
3);
+
+
+        } catch (Throwable e) {
+            System.out.println("Error: " + e);
+            System.out.println("Stacktrace: " + 
Arrays.asList(e.getStackTrace()));
+            throw e;
         }
     }
 
@@ -550,21 +795,45 @@ public PositionImpl answer(InvocationOnMock 
invocationOnMock) throws Throwable {
                         List<Entry> entries = new LinkedList<>();
                         for (int i = 0; i < readEntries; i++) {
 
+                            Foo.Bar foobar = new Foo.Bar();
+                            foobar.field1 = (int) 
fooFunctions.get("bar.test.foobar.field1").apply(count);
+
+                            Boo boo1 = new Boo();
+                            boo1.field4 = (double) 
fooFunctions.get("bar.test.field4").apply(count);
+                            boo1.field5 = (boolean) 
fooFunctions.get("bar.test.field5").apply(count);
+                            boo1.field6 = (long) 
fooFunctions.get("bar.test.field6").apply(count);
+                            boo1.foo = new Foo();
+                            boo1.boo = null;
+                            boo1.bar = new Bar();
+                            boo1.foobar = foobar;
+
+                            Boo boo2 = new Boo();
+                            boo2.field4 = (double) 
fooFunctions.get("bar.test2.field4").apply(count);
+                            boo2.field5 = (boolean) 
fooFunctions.get("bar.test2.field5").apply(count);
+                            boo2.field6 = (long) 
fooFunctions.get("bar.test2.field6").apply(count);
+                            boo2.foo = new Foo();
+                            boo2.boo = boo1;
+                            boo2.bar = new Bar();
+                            boo2.foobar = foobar;
+
+                            TestPulsarConnector.Bar bar = new 
TestPulsarConnector.Bar();
+                            bar.field1 = 
fooFunctions.get("bar.field1").apply(count) == null ? null : (int) 
fooFunctions.get("bar.field1").apply(count);
+                            bar.field2 = 
fooFunctions.get("bar.field2").apply(count) == null ? null : (String) 
fooFunctions.get("bar.field2").apply(count);
+                            bar.field3 = (float) 
fooFunctions.get("bar.field3").apply(count);
+                            bar.test = boo1;
+                            bar.test2 = count % 2 == 0 ? null : boo2;
+
                             Foo foo = new Foo();
-                            foo.field1 = count;
-                            foo.field2 = String.valueOf(count);
-                            foo.field3 = count;
-                            foo.field4 = count;
-                            foo.field5 = count % 2 == 0;
-                            foo.field6 = count;
-                            foo.timestamp = System.currentTimeMillis();
-
-                            LocalTime now = 
LocalTime.now(ZoneId.systemDefault());
-                            foo.time = now.toSecondOfDay() * 1000;
-
-                            LocalDate localDate = LocalDate.now();
-                            LocalDate epoch = LocalDate.ofEpochDay(0);
-                            foo.date = 
Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
+                            foo.field1 = (int) 
fooFunctions.get("field1").apply(count);
+                            foo.field2 = (String) 
fooFunctions.get("field2").apply(count);
+                            foo.field3 = (float) 
fooFunctions.get("field3").apply(count);
+                            foo.field4 = (double) 
fooFunctions.get("field4").apply(count);
+                            foo.field5 = (boolean) 
fooFunctions.get("field5").apply(count);
+                            foo.field6 = (long) 
fooFunctions.get("field6").apply(count);
+                            foo.timestamp = (long) 
fooFunctions.get("timestamp").apply(count);
+                            foo.time = (int) 
fooFunctions.get("time").apply(count);
+                            foo.date = (int) 
fooFunctions.get("date").apply(count);
+                            foo.bar = bar;
 
                             PulsarApi.MessageMetadata messageMetadata = 
PulsarApi.MessageMetadata.newBuilder()
                                     
.setProducerName("test-producer").setSequenceId(positions.get(topic))
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 7726763104..17ce4167ec 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -37,7 +37,6 @@
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -84,16 +83,6 @@ public void testGetTableHandle() {
         Assert.assertEquals(pulsarTableHandle.getTopicName(), 
TOPIC_1.getLocalName());
     }
 
-    @Test
-    public void testGetTableLayouts() {
-
-    }
-
-    @Test
-    public void testGetTableLayout() {
-
-    }
-
     @Test
     public void testGetTableMetadata() {
 
@@ -116,12 +105,9 @@ public void testGetTableMetadata() {
             Assert.assertEquals(tableMetadata.getTable().getTableName(), 
topic.getLocalName());
 
             Assert.assertEquals(tableMetadata.getColumns().size(),
-                    Foo.class.getDeclaredFields().length + 
PulsarInternalColumn.getInternalFields().size());
+                    fooColumnHandles.size());
 
-            List<String> fieldNames = new LinkedList<>();
-            for (Field field : Foo.class.getDeclaredFields()) {
-                fieldNames.add(field.getName());
-            }
+            List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
 
             for (PulsarInternalColumn internalField : 
PulsarInternalColumn.getInternalFields()) {
                 fieldNames.add(internalField.getName());
@@ -278,10 +264,7 @@ public void testGetColumnHandles() {
         Map<String, ColumnHandle> columnHandleMap
                 = new 
HashMap<>(this.pulsarMetadata.getColumnHandles(mock(ConnectorSession.class), 
pulsarTableHandle));
 
-        List<String> fieldNames = new LinkedList<>();
-        for (Field field : Foo.class.getDeclaredFields()) {
-            fieldNames.add(field.getName());
-        }
+        List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
 
         for (PulsarInternalColumn internalField : 
PulsarInternalColumn.getInternalFields()) {
             fieldNames.add(internalField.getName());
@@ -298,9 +281,9 @@ public void testGetColumnHandles() {
                 Schema schema = new Schema.Parser().parse(new 
String(topicsToSchemas.get(TOPIC_1.getSchemaName())
                         .getSchema()));
                 Assert.assertEquals(pulsarColumnHandle.getConnectorId(), 
pulsarConnectorId.toString());
-                Assert.assertEquals(pulsarColumnHandle.getName(), 
schema.getField(field).name());
-                Assert.assertNotNull(pulsarColumnHandle.getPositionIndex());
-                
Assert.assertEquals(pulsarColumnHandle.getPositionIndex().intValue(), 
schema.getField(field).pos());
+                Assert.assertEquals(pulsarColumnHandle.getName(), field);
+                Assert.assertEquals(pulsarColumnHandle.getPositionIndices(), 
fooPositionIndices.get(field));
+                Assert.assertEquals(pulsarColumnHandle.getFieldNames(), 
fooFieldNames.get(field));
                 Assert.assertEquals(pulsarColumnHandle.getType(), 
fooTypes.get(field));
                 Assert.assertEquals(pulsarColumnHandle.isHidden(), false);
             }
@@ -319,13 +302,10 @@ public void testListTableColumns() {
         List<ColumnMetadata> columnMetadataList
                 = tableColumnsMap.get(new 
SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName()));
         Assert.assertNotNull(columnMetadataList);
-        Assert.assertEquals(columnMetadataList.size(), 
Foo.class.getDeclaredFields().length,
-                PulsarInternalColumn.getInternalFields().size());
+        Assert.assertEquals(columnMetadataList.size(),
+                fooColumnHandles.size());
 
-        List<String> fieldNames = new LinkedList<>();
-        for (Field field : Foo.class.getDeclaredFields()) {
-            fieldNames.add(field.getName());
-        }
+        List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
 
         for (PulsarInternalColumn internalField : 
PulsarInternalColumn.getInternalFields()) {
             fieldNames.add(internalField.getName());
@@ -345,13 +325,10 @@ public void testListTableColumns() {
 
         columnMetadataList = tableColumnsMap.get(new 
SchemaTableName(TOPIC_2.getNamespace(), TOPIC_2.getLocalName()));
         Assert.assertNotNull(columnMetadataList);
-        Assert.assertEquals(columnMetadataList.size(), 
Foo.class.getDeclaredFields().length,
-                PulsarInternalColumn.getInternalFields().size());
+        Assert.assertEquals(columnMetadataList.size(),
+                fooColumnHandles.size());
 
-        fieldNames = new LinkedList<>();
-        for (Field field : Foo.class.getDeclaredFields()) {
-            fieldNames.add(field.getName());
-        }
+        fieldNames = new LinkedList<>(fooFieldNames.keySet());
 
         for (PulsarInternalColumn internalField : 
PulsarInternalColumn.getInternalFields()) {
             fieldNames.add(internalField.getName());
@@ -377,13 +354,10 @@ public void testListTableColumns() {
         Assert.assertEquals(tableColumnsMap.size(), 1);
         columnMetadataList = tableColumnsMap.get(new 
SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName()));
         Assert.assertNotNull(columnMetadataList);
-        Assert.assertEquals(columnMetadataList.size(), 
Foo.class.getDeclaredFields().length,
-                PulsarInternalColumn.getInternalFields().size());
+        Assert.assertEquals(columnMetadataList.size(),
+                fooColumnHandles.size());
 
-        fieldNames = new LinkedList<>();
-        for (Field field : Foo.class.getDeclaredFields()) {
-            fieldNames.add(field.getName());
-        }
+        fieldNames = new LinkedList<>(fooFieldNames.keySet());
 
         for (PulsarInternalColumn internalField : 
PulsarInternalColumn.getInternalFields()) {
             fieldNames.add(internalField.getName());
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 74126a61a5..1b323a2948 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -19,16 +19,14 @@
 package org.apache.pulsar.sql.presto;
 
 import io.airlift.log.Logger;
-import io.airlift.slice.Slice;
 import org.apache.pulsar.common.naming.TopicName;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.nio.charset.Charset;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
-import static com.facebook.presto.spi.type.BigintType.BIGINT;
-
 @Test(singleThreaded = true)
 public class TestPulsarRecordCursor extends TestPulsarConnector {
 
@@ -44,31 +42,81 @@ public void testTopics() throws Exception {
             PulsarRecordCursor pulsarRecordCursor = entry.getValue();
             TopicName topicName = entry.getKey();
 
-            long count = 0L;
+            int count = 0;
             while (pulsarRecordCursor.advanceNextPosition()) {
+                List<String> columnsSeen = new LinkedList<>();
                 for (int i = 0; i < fooColumnHandles.size(); i++) {
-
                     if (pulsarRecordCursor.isNull(i)) {
-
+                        columnsSeen.add(fooColumnHandles.get(i).getName());
                     } else {
-                        if (fooColumnHandles.get(i).getType().getJavaType() == 
long.class) {
-                            if (fooColumnHandles.get(i).getType() == BIGINT) {
-                                
Assert.assertEquals(pulsarRecordCursor.getLong(i), count);
-                            }
-                        } else if 
(fooColumnHandles.get(i).getType().getJavaType() == boolean.class) {
-                            
Assert.assertEquals(pulsarRecordCursor.getBoolean(i), count % 2 == 0);
-                        } else if 
(fooColumnHandles.get(i).getType().getJavaType() == double.class) {
-                            
Assert.assertEquals(pulsarRecordCursor.getDouble(i), 
Long.valueOf(count).doubleValue());
-                        } else if 
(fooColumnHandles.get(i).getType().getJavaType() == Slice.class) {
-                            if (!fooColumnHandles.get(i).isInternal()) {
-                                
Assert.assertEquals(pulsarRecordCursor.getSlice(i).toStringUtf8().getBytes(),
-                                        
Charset.forName("UTF-8").encode(String.valueOf(count)).array());
-                            }
+                        if 
(fooColumnHandles.get(i).getName().equals("field1")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), 
((Integer) fooFunctions.get("field1").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("field2")) {
+                            
Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) 
fooFunctions.get("field2").apply(count)).getBytes());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("field3")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), 
Float.floatToIntBits(((Float) 
fooFunctions.get("field3").apply(count)).floatValue()));
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("field4")) {
+                            
Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) 
fooFunctions.get("field4").apply(count)).doubleValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("field5")) {
+                            
Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) 
fooFunctions.get("field5").apply(count)).booleanValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("field6")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), 
((Long) fooFunctions.get("field6").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("timestamp")) {
+                            pulsarRecordCursor.getLong(i);
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("time")) {
+                            pulsarRecordCursor.getLong(i);
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("date")) {
+                            pulsarRecordCursor.getLong(i);
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("bar.field1")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), 
((Integer) fooFunctions.get("bar.field1").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("bar.field2")) {
+                            
Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) 
fooFunctions.get("bar.field2").apply(count)).getBytes());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("bar.field3")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), 
Float.floatToIntBits(((Float) 
fooFunctions.get("bar.field3").apply(count)).floatValue()));
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("bar.test.field4")) {
+                            
Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) 
fooFunctions.get("bar.test.field4").apply(count)).doubleValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("bar.test.field5")) {
+                            
Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) 
fooFunctions.get("bar.test.field5").apply(count)).booleanValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("bar.test.field6")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), 
((Long) fooFunctions.get("bar.test.field6").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("bar.test.foobar.field1")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), 
((Integer) 
fooFunctions.get("bar.test.foobar.field1").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("bar.test2.field4")) {
+                            
Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) 
fooFunctions.get("bar.test2.field4").apply(count)).doubleValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("bar.test2.field5")) {
+                            
Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) 
fooFunctions.get("bar.test2.field5").apply(count)).booleanValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("bar.test2.field6")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), 
((Long) fooFunctions.get("bar.test2.field6").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if 
(fooColumnHandles.get(i).getName().equals("bar.test2.foobar.field1")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), 
((Integer) 
fooFunctions.get("bar.test2.foobar.field1").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else {
-                            Assert.fail("Unknown type: " + 
fooColumnHandles.get(i).getType().getJavaType());
+                            if 
(PulsarInternalColumn.getInternalFieldsMap().containsKey(fooColumnHandles.get(i).getName()))
 {
+                                
columnsSeen.add(fooColumnHandles.get(i).getName());
+                            }
                         }
                     }
                 }
+                Assert.assertEquals(columnsSeen.size(), 
fooColumnHandles.size());
                 count++;
             }
             Assert.assertEquals(count, 
topicsToNumEntries.get(topicName.getSchemaName()).longValue());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to