Repository: sqoop Updated Branches: refs/heads/sqoop2 34d7066c3 -> d68c05d3a
SQOOP-1750: Support Map Type in CSV IDF (Veena Basavaraj via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d68c05d3 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d68c05d3 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d68c05d3 Branch: refs/heads/sqoop2 Commit: d68c05d3ad0c99dc3dee2d07b34f919b79dab09d Parents: 34d7066 Author: Jarek Jarcec Cecho <[email protected]> Authored: Mon Dec 1 15:24:02 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Mon Dec 1 15:24:02 2014 -0800 ---------------------------------------------------------------------- .../idf/CSVIntermediateDataFormat.java | 79 ++++++++- .../idf/TestCSVIntermediateDataFormat.java | 166 ++++++++++++++++++- 2 files changed, 236 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d68c05d3/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index 4f2baf9..bdab7a4 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -30,6 +30,7 @@ import org.apache.sqoop.schema.type.FloatingPoint; import org.joda.time.LocalDate; import org.joda.time.LocalDateTime; import org.json.simple.JSONArray; +import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import java.io.DataInput; @@ -39,8 +40,11 @@ import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.regex.Matcher; @@ -85,6 +89,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { private final List<Integer> stringTypeColumnIndices = new ArrayList<Integer>(); private final List<Integer> byteTypeColumnIndices = new ArrayList<Integer>(); private final List<Integer> listTypeColumnIndices = new ArrayList<Integer>(); + private final List<Integer> mapTypeColumnIndices = new ArrayList<Integer>(); private Schema schema; @@ -129,6 +134,8 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { byteTypeColumnIndices.add(i); } else if (isColumnListType(col)) { listTypeColumnIndices.add(i); + } else if (col.getType() == ColumnType.MAP) { + mapTypeColumnIndices.add(i); } i++; } @@ -147,7 +154,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { boolean quoted = false; boolean escaped = false; - boolean insideJSON = false; List<String> parsedData = new LinkedList<String>(); StringBuilder builder = new StringBuilder(); @@ -167,7 +173,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { escaped = !escaped; break; case SEPARATOR_CHARACTER: - if (quoted || insideJSON) { + if (quoted) { builder.append(c); } else { parsedData.add(builder.toString()); @@ -217,12 +223,12 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { objectArray[i] = null; continue; } - objectArray[i] = parseStringArrayElement(fieldStringArray[i], columnArray[i]); + objectArray[i] = parseCSVStringArrayElement(fieldStringArray[i], columnArray[i]); } return objectArray; } - private Object parseStringArrayElement(String fieldString, Column column) { + private Object parseCSVStringArrayElement(String fieldString, Column column) { Object returnValue = null; switch (column.getType()) { @@ -271,6 +277,9 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { case SET: returnValue = parseListElementFromJSON(fieldString); break; + case MAP: + returnValue = parseMapElementFromJSON(fieldString); + break; default: throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + column.getType()); @@ -287,11 +296,60 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e); } if (array != null) { - return array.toArray(); + return array.toArray(); + } + return null; + } + + private Map<Object, Object> parseMapElementFromJSON(String fieldString) { + + JSONObject object = null; + try { + object = (JSONObject) new JSONParser().parse(removeQuotes(fieldString)); + } catch (org.json.simple.parser.ParseException e) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e); + } + if (object != null) { + return toMap(object); } return null; } + private List<Object> toList(JSONArray array) { + List<Object> list = new ArrayList<Object>(); + for (int i = 0; i < array.size(); i++) { + Object value = array.get(i); + if (value instanceof JSONArray) { + value = toList((JSONArray) value); + } + + else if (value instanceof JSONObject) { + value = toMap((JSONObject) value); + } + list.add(value); + } + return list; + } + + @SuppressWarnings("unchecked") + private Map<Object, Object> toMap(JSONObject object) { + Map<Object, Object> elementMap = new HashMap<Object, Object>(); + Set<Map.Entry<Object, Object>> entries = object.entrySet(); + for (Map.Entry<Object, Object> entry : entries) { + Object value = entry.getValue(); + + if (value instanceof JSONArray) { + value = toList((JSONArray) value); + } + + else if (value instanceof JSONObject) { + value = toMap((JSONObject) value); + } + elementMap.put(entry.getKey(), value); + } + return elementMap; + } + /** * Appends the actual java objects into CSV string {@inheritDoc} */ @@ -351,6 +409,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { * * @param stringArray */ + @SuppressWarnings("unchecked") private void encodeCSVStringElements(Object[] stringArray, Column[] columnArray) { for (int i : stringTypeColumnIndices) { stringArray[i] = escapeString((String) stringArray[i]); @@ -361,6 +420,16 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { for (int i : listTypeColumnIndices) { stringArray[i] = encodeList((Object[]) stringArray[i], columnArray[i]); } + for (int i : mapTypeColumnIndices) { + stringArray[i] = encodeMap((Map<Object, Object>) stringArray[i], columnArray[i]); + } + } + + @SuppressWarnings("unchecked") + private String encodeMap(Map<Object, Object> map, Column column) { + JSONObject object = new JSONObject(); + object.putAll(map); + return encloseWithQuote(object.toJSONString()); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/sqoop/blob/d68c05d3/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index b629897..bd082aa 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -26,12 +26,15 @@ import static org.junit.Assert.assertTrue; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Array; import org.apache.sqoop.schema.type.Binary; import org.apache.sqoop.schema.type.Bit; import org.apache.sqoop.schema.type.Date; @@ -43,8 +46,6 @@ import org.junit.Test; public class TestCSVIntermediateDataFormat { - private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; - private IntermediateDataFormat<?> dataFormat; @Before @@ -54,8 +55,10 @@ public class TestCSVIntermediateDataFormat { private String getByteFieldString(byte[] byteFieldData) { try { - return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString(); - } catch(UnsupportedEncodingException e) { + return new StringBuilder("'") + .append(new String(byteFieldData, CSVIntermediateDataFormat.BYTE_FIELD_CHARSET)) + .append("'").toString(); + } catch (UnsupportedEncodingException e) { // Should never get to this point because ISO-8859-1 is a standard codec. return null; } @@ -566,7 +569,162 @@ public class TestCSVIntermediateDataFormat { String expected = "'[\"[11, 12]\",\"[14, 15]\"]','text'"; assertEquals(expected, dataFormat.getTextData()); } + //**************test cases for map********************** + @Test + public void testMapWithSimpleValueWithObjectArrayInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map<Object, Object> map = new HashMap<Object, Object>(); + map.put("testKey", "testValue"); + // create an array inside the object array + Object[] data = new Object[2]; + data[0] = map; + data[1] = "text"; + dataFormat.setObjectData(data); + @SuppressWarnings("unchecked") + Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0]; + assertEquals(map, expectedMap); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithComplexIntegerListValueWithObjectArrayInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value", + new FixedPoint("number")))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map<Object, Object> givenMap = new HashMap<Object, Object>(); + List<Integer> intList = new ArrayList<Integer>(); + intList.add(11); + intList.add(12); + givenMap.put("testKey", intList); + // create an array inside the object array + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + dataFormat.setObjectData(data); + @SuppressWarnings("unchecked") + Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0]; + assertEquals(givenMap.toString(), expectedMap.toString()); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithComplexStringListValueWithObjectArrayInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value", + new Text("text")))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map<Object, Object> givenMap = new HashMap<Object, Object>(); + List<String> stringList = new ArrayList<String>(); + stringList.add("A"); + stringList.add("A"); + givenMap.put("testKey", stringList); + // create an array inside the object array + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + dataFormat.setObjectData(data); + @SuppressWarnings("unchecked") + Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0]; + assertEquals(givenMap.toString(), expectedMap.toString()); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithComplexMapValueWithObjectArrayInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value", + new Text("text")))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map<Object, Object> givenMap = new HashMap<Object, Object>(); + List<String> stringList = new ArrayList<String>(); + stringList.add("A"); + stringList.add("A"); + Map<String, List<String>> anotherMap = new HashMap<String, List<String>>(); + anotherMap.put("anotherKey", stringList); + givenMap.put("testKey", anotherMap); + // create an array inside the object array + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + dataFormat.setObjectData(data); + @SuppressWarnings("unchecked") + Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0]; + assertEquals(givenMap.toString(), expectedMap.toString()); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithCSVTextInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map<Object, Object> givenMap = new HashMap<Object, Object>(); + givenMap.put("testKey", "testValue"); + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + String testData = "'{\"testKey\":\"testValue\"}','text'"; + dataFormat.setTextData(testData); + @SuppressWarnings("unchecked") + Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0]; + assertEquals(givenMap, expectedMap); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithComplexValueWithCSVTextInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map<Object, Object> givenMap = new HashMap<Object, Object>(); + givenMap.put("testKey", "testValue"); + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + String testData = "'{\"testKey\":\"testValue\"}','text'"; + dataFormat.setTextData(testData); + @SuppressWarnings("unchecked") + Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0]; + assertEquals(givenMap, expectedMap); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithObjectArrayInCSVTextOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map<Object, Object> givenMap = new HashMap<Object, Object>(); + givenMap.put("testKey", "testValue"); + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + String testData = "'{\"testKey\":\"testValue\"}','text'"; + dataFormat.setObjectData(data); + assertEquals(testData, dataFormat.getTextData()); + } + + @Test + public void testMapWithCSVTextInCSVTextOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + String testData = "'{\"testKey\":\"testValue\"}','text'"; + dataFormat.setTextData(testData); + assertEquals(testData, dataFormat.getTextData()); + } //**************test cases for schema******************* @Test(expected=SqoopException.class) public void testEmptySchema() {
