[ 
https://issues.apache.org/jira/browse/BEAM-2990?focusedWorklogId=91433&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91433
 ]

ASF GitHub Bot logged work on BEAM-2990:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Apr/18 18:49
            Start Date: 16/Apr/18 18:49
    Worklog Time Spent: 10m 
      Work Description: XuMingmin closed pull request #5079: [BEAM-2990] 
support MAP in SQL schema
URL: https://github.com/apache/beam/pull/5079
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index f32b6ce5d84..5caa6464556 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -96,9 +96,19 @@ private static long estimatedSizeBytes(FieldType 
typeDescriptor, Object value) {
         List list = (List) value;
         long listSizeBytes = 0;
         for (Object elem : list) {
-          listSizeBytes += 
estimatedSizeBytes(typeDescriptor.getComponentType(), elem);
+          listSizeBytes += 
estimatedSizeBytes(typeDescriptor.getCollectionElementType(), elem);
         }
         return 4 + listSizeBytes;
+      case MAP:
+        Map<Object, Object> map = (Map<Object, Object>) value;
+        long mapSizeBytes = 0;
+        for (Map.Entry<Object, Object> elem : map.entrySet()) {
+          mapSizeBytes += 
typeDescriptor.getMapKeyType().equals(TypeName.STRING)
+                ? ((String) elem.getKey()).length()
+                  : ESTIMATED_FIELD_SIZES.get(typeDescriptor.getMapKeyType());
+          mapSizeBytes += estimatedSizeBytes(typeDescriptor.getMapValueType(), 
elem.getValue());
+        }
+        return 4 + mapSizeBytes;
       case STRING:
         // Not always accurate - String.getBytes().length() would be more 
accurate here, but slower.
         return ((String) value).length();
@@ -121,7 +131,10 @@ public Schema getSchema() {
 
   Coder getCoder(FieldType fieldType) {
     if (TypeName.ARRAY.equals(fieldType.getTypeName())) {
-      return ListCoder.of(getCoder(fieldType.getComponentType()));
+      return ListCoder.of(getCoder(fieldType.getCollectionElementType()));
+    } else if (TypeName.MAP.equals(fieldType.getTypeName())) {
+      return MapCoder.of(coderForPrimitiveType(fieldType.getMapKeyType()),
+          getCoder(fieldType.getMapValueType()));
     } else if (TypeName.ROW.equals((fieldType.getTypeName()))) {
       return RowCoder.of(fieldType.getRowSchema());
     } else {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 3a7bc346883..436941b3875 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -122,8 +122,9 @@ public Builder addBooleanField(String name, boolean 
nullable) {
       return this;
     }
 
-    public Builder addArrayField(String name, FieldType componentType) {
-      fields.add(Field.of(name, 
TypeName.ARRAY.type().withComponentType(componentType)));
+    public Builder addArrayField(String name, FieldType collectionElementType) 
{
+      fields.add(
+          Field.of(name, 
TypeName.ARRAY.type().withCollectionElementType(collectionElementType)));
       return this;
     }
 
@@ -199,6 +200,7 @@ public int hashCode() {
     DATETIME, // Date and time.
     BOOLEAN,  // Boolean.
     ARRAY,
+    MAP,
     ROW;    // The field is itself a nested row.
 
     private final FieldType fieldType = FieldType.of(this);
@@ -207,9 +209,13 @@ public int hashCode() {
         BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
     public static final Set<TypeName> STRING_TYPES = ImmutableSet.of(STRING);
     public static final Set<TypeName> DATE_TYPES = ImmutableSet.of(DATETIME);
-    public static final Set<TypeName> CONTAINER_TYPES = ImmutableSet.of(ARRAY);
+    public static final Set<TypeName> COLLECTION_TYPES = 
ImmutableSet.of(ARRAY);
+    public static final Set<TypeName> MAP_TYPES = ImmutableSet.of(MAP);
     public static final Set<TypeName> COMPOSITE_TYPES = ImmutableSet.of(ROW);
 
+    public boolean isPrimitiveType() {
+      return !isCollectionType() && !isMapType() && !isCompositeType();
+    }
     public boolean isNumericType() {
       return NUMERIC_TYPES.contains(this);
     }
@@ -219,8 +225,11 @@ public boolean isStringType() {
     public boolean isDateType() {
       return DATE_TYPES.contains(this);
     }
-    public boolean isContainerType() {
-      return CONTAINER_TYPES.contains(this);
+    public boolean isCollectionType() {
+      return COLLECTION_TYPES.contains(this);
+    }
+    public boolean isMapType() {
+      return MAP_TYPES.contains(this);
     }
     public boolean isCompositeType() {
       return COMPOSITE_TYPES.contains(this);
@@ -241,7 +250,11 @@ public FieldType type() {
     // Returns the type of this field.
     public abstract TypeName getTypeName();
     // For container types (e.g. ARRAY), returns the type of the contained 
element.
-    @Nullable public abstract FieldType getComponentType();
+    @Nullable public abstract FieldType getCollectionElementType();
+    // For MAP type, returns the type of the key element, it must be a 
primitive type;
+    @Nullable public abstract TypeName getMapKeyType();
+    // For MAP type, returns the type of the value element, it can be a nested 
type;
+    @Nullable public abstract FieldType getMapValueType();
     // For ROW types, returns the schema for the row.
     @Nullable public abstract Schema getRowSchema();
     /**
@@ -252,7 +265,9 @@ public FieldType type() {
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setTypeName(TypeName typeName);
-      abstract Builder setComponentType(@Nullable FieldType componentType);
+      abstract Builder setCollectionElementType(@Nullable FieldType 
collectionElementType);
+      abstract Builder setMapKeyType(@Nullable TypeName mapKeyType);
+      abstract Builder setMapValueType(@Nullable FieldType mapValueType);
       abstract Builder setRowSchema(@Nullable Schema rowSchema);
       abstract Builder setMetadata(@Nullable byte[] metadata);
       abstract FieldType build();
@@ -268,11 +283,24 @@ public static FieldType of(TypeName typeName) {
     /**
      * For container types, adds the type of the component element.
      */
-    public FieldType withComponentType(@Nullable FieldType componentType) {
-      if (componentType != null) {
-        checkArgument(getTypeName().isContainerType());
+    public FieldType withCollectionElementType(@Nullable FieldType 
collectionElementType) {
+      if (collectionElementType != null) {
+        checkArgument(getTypeName().isCollectionType());
+      }
+      return 
toBuilder().setCollectionElementType(collectionElementType).build();
+    }
+
+    /**
+     * For MAP type, adds the type of the component key/value element.
+     */
+    public FieldType withMapType(@Nullable TypeName mapKeyType,
+        @Nullable FieldType mapValueType) {
+      if (mapKeyType != null && mapValueType != null) {
+        checkArgument(getTypeName().isMapType());
+        checkArgument(mapKeyType.isPrimitiveType());
       }
-      return toBuilder().setComponentType(componentType).build();
+      return toBuilder().setMapKeyType(mapKeyType)
+          .setMapValueType(mapValueType).build();
     }
 
     /**
@@ -306,7 +334,9 @@ public boolean equals(Object o) {
       }
       FieldType other = (FieldType) o;
       return Objects.equals(getTypeName(), other.getTypeName())
-          && Objects.equals(getComponentType(), other.getComponentType())
+          && Objects.equals(getCollectionElementType(), 
other.getCollectionElementType())
+          && Objects.equals(getMapKeyType(), other.getMapKeyType())
+          && Objects.equals(getMapValueType(), other.getMapValueType())
           && Objects.equals(getRowSchema(), other.getRowSchema())
           && Arrays.equals(getMetadata(), other.getMetadata());
 
@@ -314,8 +344,8 @@ public boolean equals(Object o) {
 
     @Override
     public int hashCode() {
-      return Arrays.deepHashCode(
-          new Object[] {getTypeName(), getComponentType(), getRowSchema(), 
getMetadata()});
+      return Arrays.deepHashCode(new Object[] { getTypeName(), 
getCollectionElementType(),
+          getMapKeyType(), getMapValueType(), getRowSchema(), getMetadata() });
     }
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
index b66fc91bb37..3006bda4927 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
@@ -230,7 +230,7 @@ boolean isArrayType() {
     }
 
     FieldType arrayElementType() {
-      return type().getComponentType();
+      return type().getCollectionElementType();
     }
 
     boolean isJsonObject() {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java
index 137f4dc1ce7..27a5929e938 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValidation.java
@@ -54,8 +54,8 @@ static void verifyFieldTypeSupported(Schema.FieldType 
fieldType) {
       return;
     }
 
-    if (fieldTypeName.isContainerType()) {
-      verifyFieldTypeSupported(fieldType.getComponentType());
+    if (fieldTypeName.isCollectionType()) {
+      verifyFieldTypeSupported(fieldType.getCollectionElementType());
       return;
     }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index a3420a476f2..9b3ce3efceb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -21,12 +21,15 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.stream.Collector;
 import javax.annotation.Nullable;
@@ -182,6 +185,14 @@ public boolean getBoolean(String fieldName) {
     return getArray(getSchema().indexOf(fieldName));
   }
 
+  /**
+   * Get a MAP value by field name, {@link IllegalStateException} is thrown
+   * if schema doesn't match.
+   */
+  public <T1, T2> Map<T1, T2> getMap(String fieldName) {
+    return getMap(getSchema().indexOf(fieldName));
+  }
+
   /**
    * Get a {@link TypeName#ROW} value by field name, {@link 
IllegalStateException} is thrown
    * if schema doesn't match.
@@ -279,6 +290,14 @@ public boolean getBoolean(int idx) {
     return getValue(idx);
   }
 
+  /**
+   * Get a MAP value by field index, {@link IllegalStateException} is thrown
+   * if schema doesn't match.
+   */
+  public <T1, T2> Map<T1, T2> getMap(int idx) {
+    return getValue(idx);
+  }
+
   /**
    * Get a {@link Row} value by field index, {@link IllegalStateException} is 
thrown
    * if schema doesn't match.
@@ -341,6 +360,11 @@ public static Builder withSchema(Schema schema) {
       this.schema = schema;
     }
 
+    public Builder addValue(Object values) {
+      this.values.add(values);
+      return this;
+    }
+
     public Builder addValues(List<Object> values) {
       this.values.addAll(values);
       return this;
@@ -378,23 +402,28 @@ public Builder addArray(Object ... values) {
           }
           verifiedValues.add(null);
         } else {
-          FieldType type = field.getType();
-          if (TypeName.ARRAY.equals(type.getTypeName())) {
-            List<Object> arrayElements = verifyArray(
-                value, type.getComponentType(), field.getName());
-            verifiedValues.add(arrayElements);
-          } else if (TypeName.ROW.equals(type.getTypeName())) {
-            verifiedValues.add(verifyRow(value, field.getName()));
-          } else {
-            verifiedValues.add(verifyPrimitiveType(value, type.getTypeName(),
-                field.getName()));
-          }
+          verifiedValues.add(verify(value, field.getType(), field.getName()));
         }
       }
       return verifiedValues;
     }
 
-    private List<Object> verifyArray(Object value, FieldType componentType,
+    private Object verify(Object value, FieldType type, String fieldName) {
+      if (TypeName.ARRAY.equals(type.getTypeName())) {
+        List<Object> arrayElements = verifyArray(value, 
type.getCollectionElementType(), fieldName);
+        return arrayElements;
+      } else if (TypeName.MAP.equals(type.getTypeName())) {
+        Map<Object, Object> mapElements = verifyMap(value, 
type.getMapKeyType(),
+            type.getMapValueType(), fieldName);
+        return mapElements;
+      } else if (TypeName.ROW.equals(type.getTypeName())) {
+        return verifyRow(value, fieldName);
+      } else {
+        return verifyPrimitiveType(value, type.getTypeName(), fieldName);
+      }
+    }
+
+    private List<Object> verifyArray(Object value, FieldType 
collectionElementType,
                                      String fieldName) {
       if (!(value instanceof List)) {
         throw new IllegalArgumentException(
@@ -404,19 +433,27 @@ public Builder addArray(Object ... values) {
       List<Object> valueList = (List<Object>) value;
       List<Object> verifiedList = 
Lists.newArrayListWithCapacity(valueList.size());
       for (Object listValue : valueList) {
-        if (TypeName.ARRAY.equals(componentType.getTypeName())) {
-          verifiedList.add(verifyArray(listValue, 
componentType.getComponentType(),
-              fieldName + "nested"));
-        } else if (TypeName.ROW.equals(componentType.getTypeName())) {
-          verifiedList.add(verifyRow(listValue, fieldName));
-        } else {
-          verifiedList.add(verifyPrimitiveType(listValue,
-              componentType.getTypeName(), fieldName));
-        }
+        verifiedList.add(verify(listValue, collectionElementType, fieldName));
       }
       return verifiedList;
     }
 
+    private Map<Object, Object> verifyMap(Object value, TypeName keyTypeName,
+        FieldType valueType, String fieldName) {
+      if (!(value instanceof Map)) {
+        throw new IllegalArgumentException(String.format(
+            "For field name %s and map type expected Map class. Instead " + 
"class type was %s.",
+            fieldName, value.getClass()));
+      }
+      Map<Object, Object> valueMap = (Map<Object, Object>) value;
+      Map<Object, Object> verifiedMap = 
Maps.newHashMapWithExpectedSize(valueMap.size());
+      for (Entry<Object, Object> kv : valueMap.entrySet()) {
+        verifiedMap.put(verifyPrimitiveType(kv.getKey(), keyTypeName, 
fieldName),
+            verify(kv.getValue(), valueType, fieldName));
+      }
+      return verifiedMap;
+    }
+
     private Row verifyRow(Object value, String fieldName) {
       if (!(value instanceof Row)) {
         throw new IllegalArgumentException(
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
index 950302bbf30..cd1d96aebc4 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
@@ -101,8 +101,8 @@ public void testArrayOfRow() throws Exception {
     Schema nestedSchema = Schema.builder()
         .addInt32Field("f1_int", false)
         .addStringField("f1_str", false).build();
-    FieldType componentType = TypeName.ROW.type().withRowSchema(nestedSchema);
-    Schema schema = Schema.builder().addArrayField("f_array", 
componentType).build();
+    FieldType collectionElementType = 
TypeName.ROW.type().withRowSchema(nestedSchema);
+    Schema schema = Schema.builder().addArrayField("f_array", 
collectionElementType).build();
     Row row = Row.withSchema(schema).addArray(
         Row.withSchema(nestedSchema).addValues(1, "one").build(),
         Row.withSchema(nestedSchema).addValues(2, "two").build(),
@@ -114,8 +114,8 @@ public void testArrayOfRow() throws Exception {
   @Test
   public void testArrayOfArray() throws Exception {
     FieldType arrayType = TypeName.ARRAY.type()
-        .withComponentType(TypeName.ARRAY.type()
-            .withComponentType(TypeName.INT32.type()));
+        .withCollectionElementType(TypeName.ARRAY.type()
+            .withCollectionElementType(TypeName.INT32.type()));
     Schema schema = Schema.builder().addField(Field.of("f_array", 
arrayType)).build();
     Row row = Row.withSchema(schema).addArray(
         Lists.newArrayList(1, 2, 3, 4),
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
index b439f912586..f33edc65d20 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
@@ -108,7 +108,7 @@ public void testNestedSchema() {
   @Test
   public void testArraySchema() {
     FieldType arrayType = TypeName.ARRAY.type()
-        .withComponentType(TypeName.STRING.type());
+        .withCollectionElementType(TypeName.STRING.type());
     Schema schema = Schema.of(Field.of("f_array", arrayType));
     Field field = schema.getField("f_array");
     assertEquals("f_array", field.getName());
@@ -120,7 +120,7 @@ public void testArrayOfRowSchema() {
     Schema nestedSchema = Schema.of(
         Field.of("f1_str", TypeName.STRING.type()));
     FieldType arrayType = TypeName.ARRAY.type()
-        .withComponentType(TypeName.ROW.type()
+        .withCollectionElementType(TypeName.ROW.type()
             .withRowSchema(nestedSchema));
     Schema schema = Schema.of(Field.of("f_array", arrayType));
     Field field = schema.getField("f_array");
@@ -131,8 +131,8 @@ public void testArrayOfRowSchema() {
   @Test
   public void testNestedArraySchema() {
     FieldType arrayType = TypeName.ARRAY.type()
-        .withComponentType(TypeName.ARRAY.type()
-            .withComponentType(TypeName.STRING.type()));
+        .withCollectionElementType(TypeName.ARRAY.type()
+            .withCollectionElementType(TypeName.STRING.type()));
     Schema schema = Schema.of(Field.of("f_array", arrayType));
     Field field = schema.getField("f_array");
     assertEquals("f_array", field.getName());
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
index 3e2a6e46a7a..a93b3f81e32 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
@@ -144,7 +144,7 @@ public void testParsesArrayOfArrays() throws Exception {
         Schema
             .builder()
             .addArrayField("f_arrayOfIntArrays",
-                           FieldType.of(ARRAY).withComponentType(INT32.type()))
+                           
FieldType.of(ARRAY).withCollectionElementType(INT32.type()))
             .build();
 
     String rowString = "{\n"
@@ -174,7 +174,7 @@ public void testThrowsForMismatchedArrayField() throws 
Exception {
         Schema
             .builder()
             .addArrayField("f_arrayOfIntArrays",
-                           FieldType.of(ARRAY).withComponentType(INT32.type()))
+                           
FieldType.of(ARRAY).withCollectionElementType(INT32.type()))
             .build();
 
     String rowString = "{\n"
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
index 443c8cc88b8..7389aae67ca 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
@@ -25,7 +25,10 @@
 
 import com.google.common.collect.Lists;
 import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Stream;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
@@ -134,7 +137,7 @@ public void testCreatesArray() {
     Schema type = Stream
         .of(Schema.Field.of("array",
             TypeName.ARRAY.type()
-                .withComponentType(TypeName.INT32.type())))
+                .withCollectionElementType(TypeName.INT32.type())))
         .collect(toSchema());
     Row row = Row.withSchema(type).addArray(data).build();
     assertEquals(data, row.getArray("array"));
@@ -153,7 +156,7 @@ public void testCreatesRowArray() {
     Schema type = Stream
         .of(Schema.Field.of("array",
             TypeName.ARRAY.type()
-                .withComponentType(TypeName.ROW.type()
+                .withCollectionElementType(TypeName.ROW.type()
                     .withRowSchema(nestedType))))
         .collect(toSchema());
     Row row = Row.withSchema(type).addArray(data).build();
@@ -167,13 +170,110 @@ public void testCreatesArrayArray() {
     Schema type = Stream
         .of(Schema.Field.of("array",
             TypeName.ARRAY.type()
-                .withComponentType(TypeName.ARRAY.type()
-                    .withComponentType(TypeName.INT32.type()))))
+                .withCollectionElementType(TypeName.ARRAY.type()
+                    .withCollectionElementType(TypeName.INT32.type()))))
         .collect(toSchema());
     Row row = Row.withSchema(type).addArray(data).build();
     assertEquals(data, row.getArray("array"));
   }
 
+  @Test
+  public void testCreatesArrayOfMap() {
+    List<Map<Integer, String>> data = Lists
+        .<Map<Integer, String>>newArrayList(Lists.newArrayList(new 
HashMap<Integer, String>() {
+          {
+            put(1, "value1");
+          }
+        }, new HashMap<Integer, String>() {
+          {
+            put(2, "value2");
+          }
+        }));
+    Schema type = Stream
+        .of(Schema.Field.of("array",
+            TypeName.ARRAY.type().withCollectionElementType(
+                TypeName.MAP.type().withMapType(TypeName.INT32, 
TypeName.STRING.type()))))
+        .collect(toSchema());
+    Row row = Row.withSchema(type).addArray(data).build();
+    assertEquals(data, row.getArray("array"));
+  }
+
+  @Test
+  public void testCreateMapWithPrimitiveValue() {
+    Map<Integer, String> data = new HashMap<Integer, String>() {
+      {
+        put(1, "value1");
+        put(2, "value2");
+        put(3, "value3");
+        put(4, "value4");
+      }
+    };
+    Schema type = Stream
+        .of(Schema.Field.of("map",
+            TypeName.MAP.type().withMapType(TypeName.INT32, 
TypeName.STRING.type())))
+        .collect(toSchema());
+    Row row = Row.withSchema(type).addValue(data).build();
+    assertEquals(data, row.getMap("map"));
+  }
+
+  @Test
+  public void testCreateMapWithArrayValue() {
+    Map<Integer, List<String>> data = new HashMap<Integer, List<String>>() {
+      {
+        put(1, Arrays.asList("value1"));
+        put(2, Arrays.asList("value2"));
+      }
+    };
+    Schema type = Stream
+        .of(Schema.Field.of("map",
+            TypeName.MAP.type().withMapType(TypeName.INT32,
+                
TypeName.ARRAY.type().withCollectionElementType(TypeName.STRING.type()))))
+        .collect(toSchema());
+    Row row = Row.withSchema(type).addValue(data).build();
+    assertEquals(data, row.getMap("map"));
+  }
+
+  @Test
+  public void testCreateMapWithMapValue() {
+    Map<Integer, Map<Integer, String>> data = new HashMap<Integer, 
Map<Integer, String>>() {
+      {
+        put(1, new HashMap<Integer, String>() {
+          {
+            put(1, "value1");
+          }
+        });
+        put(2, new HashMap<Integer, String>() {
+          {
+            put(2, "value2");
+          }
+        });
+      }
+    };
+    Schema type = Stream
+        .of(Schema.Field.of("map",
+            TypeName.MAP.type().withMapType(TypeName.INT32,
+                TypeName.MAP.type().withMapType(TypeName.INT32, 
TypeName.STRING.type()))))
+        .collect(toSchema());
+    Row row = Row.withSchema(type).addValue(data).build();
+    assertEquals(data, row.getMap("map"));
+  }
+
+  @Test
+  public void testCreateMapWithRowValue() {
+    Schema nestedType = Stream.of(Schema.Field.of("f1_str", 
TypeName.STRING.type()))
+        .collect(toSchema());
+    Map<Integer, Row> data = new HashMap<Integer, Row>() {
+      {
+        put(1, Row.withSchema(nestedType).addValues("one").build());
+        put(2, Row.withSchema(nestedType).addValues("two").build());
+      }
+    };
+    Schema type = Stream.of(Schema.Field.of("map", 
TypeName.MAP.type().withMapType(TypeName.INT32,
+        TypeName.ROW.type().withRowSchema(nestedType)))).collect(toSchema());
+    Row row = Row.withSchema(type).addValue(data).build();
+    assertEquals(data, row.getMap("map"));
+  }
+
   @Test
   public void testCollector() {
     Schema type =
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
index c6bec341fd8..7be646c81d7 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
@@ -131,7 +131,7 @@ public Builder withTimestampField(String fieldName) {
     }
 
     /**
-     * Adds an ARRAY field with elements of the give type.
+     * Adds an ARRAY field with elements of the given type.
      */
     public Builder withArrayField(String fieldName, RelDataType relDataType) {
       builder.addField(Field.of(fieldName, 
CalciteUtils.toArrayType(relDataType)));
@@ -139,23 +139,42 @@ public Builder withArrayField(String fieldName, 
RelDataType relDataType) {
     }
 
     /**
-     * Adds an ARRAY field with elements of the give type.
+     * Adds an ARRAY field with elements of the given type.
      */
     public Builder withArrayField(String fieldName, SqlTypeName typeName) {
       builder.addField(Field.of(fieldName, 
CalciteUtils.toArrayType(typeName)));
       return this;
     }
 
+    /**
+     * Adds a MAP field with elements of the given key/value type.
+     */
+    public Builder withMapField(String fieldName, RelDataType keyRelDataType,
+        RelDataType valueRelDataType) {
+      builder
+          .addField(Field.of(fieldName, CalciteUtils.toMapType(keyRelDataType, 
valueRelDataType)));
+      return this;
+    }
+
+    /**
+     * Adds a MAP field with elements of the given key/value type.
+     */
+    public Builder withMapField(String fieldName, SqlTypeName keyTypeName,
+        SqlTypeName valueTypeName) {
+      builder.addField(Field.of(fieldName, CalciteUtils.toMapType(keyTypeName, 
valueTypeName)));
+      return this;
+    }
+
     /**
      * Adds an ARRAY field with elements of {@code rowType}.
      */
     public Builder withArrayField(String fieldName, Schema schema) {
-      FieldType componentType =
+      FieldType collectionElementType =
           FieldType
               .of(TypeName.ROW)
               .withRowSchema(schema);
       builder.addField(Field.of(fieldName,
-          TypeName.ARRAY.type().withComponentType(componentType)));
+          
TypeName.ARRAY.type().withCollectionElementType(collectionElementType)));
       return this;
     }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 86e8c5d235d..9de107b7d16 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -61,6 +61,8 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.map.BeamSqlMapExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.map.BeamSqlMapItemExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAbsExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAcosExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAsinExpression;
@@ -405,9 +407,19 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
         // array functions
         case "ARRAY":
           return new BeamSqlArrayExpression(subExps);
+       // map functions
+        case "MAP":
+          return new BeamSqlMapExpression(subExps);
 
         case "ITEM":
+        switch (subExps.get(0).getOutputType()) {
+        case MAP:
+          return new BeamSqlMapItemExpression(subExps, 
node.type.getSqlTypeName());
+        case ARRAY:
           return new BeamSqlArrayItemExpression(subExps, 
node.type.getSqlTypeName());
+        default:
+          throw new UnsupportedOperationException("Operator: " + opName + " is 
not supported yet");
+        }
 
         // collections functions
         case "ELEMENT":
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
index b64685919a8..3afd4c4f06a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -19,6 +19,7 @@
 
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -146,6 +147,8 @@ public boolean accept() {
       return true;
     case ARRAY:
       return value instanceof List;
+    case MAP:
+      return value instanceof Map;
     case ROW:
       return value instanceof Row;
     default:
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpression.java
index 3ca90700c23..739a0b69b12 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/array/BeamSqlArrayItemExpression.java
@@ -38,7 +38,7 @@ public BeamSqlArrayItemExpression(
 
   @Override
   public boolean accept() {
-    return operands.size() == 2;
+    return operands.size() == 2 && 
op(0).getOutputType().equals(SqlTypeName.ARRAY);
   }
 
   @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapExpression.java
new file mode 100644
index 00000000000..d310a1f00e9
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapExpression.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.map;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Represents MAP expression in SQL.
+ */
+public class BeamSqlMapExpression extends BeamSqlExpression {
+  public BeamSqlMapExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.MAP);
+  }
+
+  @Override
+  public boolean accept() {
+    return
+        operands
+            .stream()
+            .map(BeamSqlExpression::getOutputType)
+            .distinct()
+            .count() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
+    Map<Object, Object> elements = new HashMap<>();
+    for (int idx = 0; idx < operands.size() / 2; ++idx) {
+      elements.put(operands.get(idx * 2).evaluate(inputRow, window).getValue(),
+          operands.get(idx * 2 + 1).evaluate(inputRow, window).getValue());
+    }
+    return BeamSqlPrimitive.of(outputType, elements);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapItemExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapItemExpression.java
new file mode 100644
index 00000000000..eaf3905a7bf
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapItemExpression.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.map;
+
+import java.util.List;
+import java.util.Map;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Implements map key access expression.
+ */
+public class BeamSqlMapItemExpression extends BeamSqlExpression {
+
+  public BeamSqlMapItemExpression(
+      List<BeamSqlExpression> operands,
+      SqlTypeName sqlTypeName) {
+
+    super(operands, sqlTypeName);
+  }
+
+  @Override
+  public boolean accept() {
+    return operands.size() == 2 && 
op(0).getOutputType().equals(SqlTypeName.MAP);
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
+    Map<Object, Object> map = opValueEvaluated(0, inputRow, window);
+    Object key = opValueEvaluated(1, inputRow, window);
+
+    return BeamSqlPrimitive.of(outputType, map.get(key));
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/package-info.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/package-info.java
new file mode 100644
index 00000000000..a6f6d68cece
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Expressions implementing map operations.
+ */
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.map;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 44525d36d0c..140b1c10531 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -57,6 +57,7 @@
 
           .put(TypeName.BOOLEAN.type(), SqlTypeName.BOOLEAN)
 
+          .put(TypeName.MAP.type(), SqlTypeName.MAP)
           .put(TypeName.ARRAY.type(), SqlTypeName.ARRAY)
           .put(TypeName.ROW.type(), SqlTypeName.ROW)
           .put(TypeName.DATETIME.type().withMetadata("DATE"), SqlTypeName.DATE)
@@ -93,7 +94,7 @@ public static Schema toBeamSchema(RelDataType tableInfo) {
 
   public static SqlTypeName toSqlTypeName(FieldType type) {
     SqlTypeName typeName = BEAM_TO_CALCITE_TYPE_MAPPING.get(
-        type.withComponentType(null).withRowSchema(null));
+        
type.withCollectionElementType(null).withRowSchema(null).withMapType(null, 
null));
     if (typeName != null) {
       return typeName;
     } else {
@@ -110,20 +111,34 @@ public static FieldType toFieldType(SqlTypeName 
sqlTypeName) {
   public static FieldType toFieldType(RelDataType calciteType) {
     FieldType type = toFieldType((calciteType.getSqlTypeName()));
     if (calciteType.getComponentType() != null) {
-      type = 
type.withComponentType(toFieldType(calciteType.getComponentType()));
+      type = 
type.withCollectionElementType(toFieldType(calciteType.getComponentType()));
     }
     if (calciteType.isStruct()) {
       type = type.withRowSchema(toBeamSchema(calciteType));
     }
+    if (calciteType.getKeyType() != null && calciteType.getValueType() != 
null) {
+      type = 
type.withMapType(toFieldType(calciteType.getKeyType()).getTypeName(),
+          toFieldType(calciteType.getValueType()));
+    }
     return type;
   }
 
-  public static FieldType toArrayType(SqlTypeName componentType) {
-    return TypeName.ARRAY.type().withComponentType(toFieldType(componentType));
+  public static FieldType toArrayType(SqlTypeName collectionElementType) {
+    return 
TypeName.ARRAY.type().withCollectionElementType(toFieldType(collectionElementType));
+  }
+
+  public static FieldType toArrayType(RelDataType collectionElementType) {
+    return 
TypeName.ARRAY.type().withCollectionElementType(toFieldType(collectionElementType));
+  }
+
+  public static FieldType toMapType(SqlTypeName componentKeyType, SqlTypeName 
componentValueType) {
+    return 
TypeName.MAP.type().withMapType(toFieldType(componentKeyType).getTypeName(),
+        toFieldType(componentValueType));
   }
 
-  public static FieldType toArrayType(RelDataType componentType) {
-    return TypeName.ARRAY.type().withComponentType(toFieldType(componentType));
+  public static FieldType toMapType(RelDataType componentKeyType, RelDataType 
componentValueType) {
+    return 
TypeName.MAP.type().withMapType(toFieldType(componentKeyType).getTypeName(),
+        toFieldType(componentValueType));
   }
 
   public static Schema.Field toBeamSchemaField(RelDataTypeField calciteField) {
@@ -152,9 +167,15 @@ private static RelDataType toRelDataType(
       RelDataTypeFactory dataTypeFactory, FieldType fieldType) {
     SqlTypeName typeName = toSqlTypeName(fieldType);
     if (SqlTypeName.ARRAY.equals(typeName)) {
-      RelDataType componentType = toRelDataType(
-          dataTypeFactory, fieldType.getComponentType());
-      return dataTypeFactory.createArrayType(componentType, 
UNLIMITED_ARRAY_SIZE);
+      RelDataType collectionElementType = toRelDataType(
+          dataTypeFactory, fieldType.getCollectionElementType());
+      return dataTypeFactory.createArrayType(collectionElementType, 
UNLIMITED_ARRAY_SIZE);
+    } else if (SqlTypeName.MAP.equals(typeName)) {
+      RelDataType componentKeyType = toRelDataType(
+          dataTypeFactory, fieldType.getMapKeyType().type());
+      RelDataType componentValueType = toRelDataType(
+          dataTypeFactory, fieldType.getMapValueType());
+      return dataTypeFactory.createMapType(componentKeyType, 
componentValueType);
     } else if (SqlTypeName.ROW.equals(typeName)) {
       return toCalciteRowType(fieldType.getRowSchema(), dataTypeFactory);
     } else {
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java
new file mode 100644
index 00000000000..859ec8897e1
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.util.HashMap;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Tests for SQL MAP type.
+ */
+public class BeamSqlMapTest {
+
+  private static final Schema INPUT_ROW_TYPE = 
RowSqlTypes.builder().withIntegerField("f_int")
+      .withMapField("f_intStringMap", SqlTypeName.VARCHAR, 
SqlTypeName.INTEGER).build();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  @Rule
+  public ExpectedException exceptions = ExpectedException.none();
+
+  @Test
+  public void testSelectAll() {
+    PCollection<Row> input = pCollectionOf2Elements();
+
+    Schema resultType = RowSqlTypes.builder().withIntegerField("f_int")
+        .withMapField("f_map", SqlTypeName.VARCHAR, 
SqlTypeName.INTEGER).build();
+
+    PCollection<Row> result = input.apply("sqlQuery",
+        BeamSql.query("SELECT f_int, f_intStringMap as f_map FROM 
PCOLLECTION"));
+
+    PAssert.that(result)
+        .containsInAnyOrder(Row.withSchema(resultType).addValues(1, new 
HashMap<String, Integer>() {
+          {
+            put("key11", 11);
+            put("key22", 22);
+          }
+        }).build(),
+
+            Row.withSchema(resultType).addValues(2, new HashMap<String, 
Integer>() {
+              {
+                put("key33", 33);
+                put("key44", 44);
+                put("key55", 55);
+              }
+            }).build());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testSelectMapField() {
+    PCollection<Row> input = pCollectionOf2Elements();
+
+    Schema resultType = RowSqlTypes.builder().withIntegerField("f_int")
+        .withMapField("f_intStringMap", SqlTypeName.VARCHAR, 
SqlTypeName.INTEGER).build();
+
+    PCollection<Row> result = input.apply("sqlQuery",
+        BeamSql.query("SELECT 42, MAP['aa', 1] as `f_map` FROM PCOLLECTION"));
+
+    PAssert.that(result).containsInAnyOrder(
+        Row.withSchema(resultType).addValues(42, new HashMap<String, 
Integer>() {
+          {
+            put("aa", 1);
+          }
+        }).build(), Row.withSchema(resultType).addValues(42, new 
HashMap<String, Integer>() {
+          {
+            put("aa", 1);
+          }
+        }).build());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testAccessMapElement() {
+    PCollection<Row> input = pCollectionOf2Elements();
+
+    Schema resultType = 
RowSqlTypes.builder().withIntegerField("f_mapElem").build();
+
+    PCollection<Row> result = input.apply("sqlQuery",
+        BeamSql.query("SELECT f_intStringMap['key11'] FROM PCOLLECTION"));
+
+    
PAssert.that(result).containsInAnyOrder(Row.withSchema(resultType).addValues(11).build(),
+        Row.withSchema(resultType).addValue(null).build());
+
+    pipeline.run();
+  }
+
+  private PCollection<Row> pCollectionOf2Elements() {
+    return PBegin.in(pipeline).apply("boundedInput1", Create
+        .of(Row.withSchema(INPUT_ROW_TYPE).addValues(1).addValue(new 
HashMap<String, Integer>() {
+          {
+            put("key11", 11);
+            put("key22", 22);
+          }
+        }).build(),
+            Row.withSchema(INPUT_ROW_TYPE).addValues(2).addValue(new 
HashMap<String, Integer>() {
+              {
+                put("key33", 33);
+                put("key44", 44);
+                put("key55", 55);
+              }
+            }).build())
+        .withCoder(INPUT_ROW_TYPE.getRowCoder()));
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 91433)
    Time Spent: 6h 10m  (was: 6h)

> support data type MAP
> ---------------------
>
>                 Key: BEAM-2990
>                 URL: https://issues.apache.org/jira/browse/BEAM-2990
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Xu Mingmin
>            Assignee: Xu Mingmin
>            Priority: Major
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> support Non-scalar types:
> MAP   Collection of keys mapped to values
> ARRAY         Ordered, contiguous collection that may contain duplicates



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to