Repository: sqoop Updated Branches: refs/heads/sqoop2 dc50e4074 -> 60066b8f3
SQOOP-1968: Optimize schema operation in getMatchingData of NameMatcher (Qian Xu via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/60066b8f Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/60066b8f Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/60066b8f Branch: refs/heads/sqoop2 Commit: 60066b8f3873cd322735a7b822b9eac16e24c0fc Parents: dc50e40 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Jan 7 12:12:26 2015 +0100 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Jan 7 12:13:06 2015 +0100 ---------------------------------------------------------------------- .../connector/matcher/LocationMatcher.java | 46 +++----- .../apache/sqoop/connector/matcher/Matcher.java | 15 +++ .../sqoop/connector/matcher/NameMatcher.java | 67 +++++------ .../sqoop/connector/matcher/SchemaFixture.java | 61 ++++++++++ .../connector/matcher/TestLocationMatcher.java | 110 ++++++++++++++++++ .../connector/matcher/TestNameMatcher.java | 116 +++++++++++++++++++ 6 files changed, 350 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java index 01adaf0..d92723e 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java @@ -17,62 +17,44 @@ */ package org.apache.sqoop.connector.matcher; -import org.apache.log4j.Logger; -import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.SchemaError; import org.apache.sqoop.schema.type.Column; - /** - * Convert data according to FROM schema to data according to TO schema - * This is done based on column location - * So data in first column in FROM goes into first column in TO, etc - * If TO schema has more fields and they are "nullable", the value will be set to null - * If TO schema has extra non-null fields, we'll throw an exception + * Convert data according to FROM schema to data according to TO schema. This is + * done based on column location, Data in first column in FROM goes into first + * column in TO, etc., if TO schema has more fields and they are "nullable", + * their values will be set to null. If TO schema has extra non-null fields, we + * will throw an exception. */ public class LocationMatcher extends Matcher { - public static final Logger LOG = Logger.getLogger(LocationMatcher.class); - public LocationMatcher(Schema from, Schema to) { super(from, to); } @Override public Object[] getMatchingData(Object[] fields) { - - Object[] out = new Object[getToSchema().getColumnsCount()]; - - int i = 0; - if (getToSchema().isEmpty()) { - // If there's no destination schema, no need to convert anything - // Just use the original data + // No destination schema found. No need to convert anything. return fields; } - for (Column col: getToSchema().getColumnsArray()) { + Object[] out = new Object[getToSchema().getColumnsCount()]; + int i = 0; + + for (Column col : getToSchema().getColumnsList()) { if (i < fields.length) { - if (isNull(fields[i])) { - out[i] = null; - } else { - out[i] = fields[i]; - } + Object value = fields[i]; + out[i] = isNull(value) ? null : value; } // We ran out of fields before we ran out of schema else { - if (!col.getNullable()) { - throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + col + " didn't match with any source column and cannot be null"); - } else { - LOG.warn("Column " + col + " has no matching source column. Will be ignored. "); - out[i] = null; - } + tryFillNullInArrayForUnexpectedColumn(col, out, i); } i++; } return out; } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java index 39e0007..554e415 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java @@ -17,12 +17,16 @@ */ package org.apache.sqoop.connector.matcher; +import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.ByteArraySchema; import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.SchemaError; +import org.apache.sqoop.schema.type.Column; public abstract class Matcher { + private static final Logger LOG = Logger.getLogger(Matcher.class); private final Schema fromSchema; private final Schema toSchema; @@ -67,5 +71,16 @@ public abstract class Matcher { return false; } + protected void tryFillNullInArrayForUnexpectedColumn(Column column, + Object[] array, int index) throws SqoopException { + if (!column.getNullable()) { + throw new SqoopException(SchemaError.SCHEMA_0004, "Target column " + + column + " didn't match with any source column and cannot be null."); + } + + LOG.warn("Column " + column + + " has no matching source column. Will be ignored."); + array[index] = null; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java index c01b916..7cbc39f 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java @@ -17,58 +17,59 @@ */ package org.apache.sqoop.connector.matcher; -import org.apache.log4j.Logger; -import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.SchemaError; import org.apache.sqoop.schema.type.Column; import java.util.HashMap; +/** + * Convert data according to FROM schema to data according to TO schema. This is + * done based on column name, If TO schema has more fields and they are + * "nullable", their values will be set to null. If TO schema has extra non-null + * fields, we will throw an exception. + */ public class NameMatcher extends Matcher { - public static final Logger LOG = Logger.getLogger(NameMatcher.class); + private HashMap<String, Integer> fromColNameIndexMap; public NameMatcher(Schema from, Schema to) { super(from, to); + + fromColNameIndexMap = new HashMap<String, Integer>(); + int fromIndex = 0; + + for (Column fromCol : getFromSchema().getColumnsList()) { + fromColNameIndexMap.put(fromCol.getName(), fromIndex); + fromIndex++; + } } @Override public Object[] getMatchingData(Object[] fields) { - Object[] out = new Object[getToSchema().getColumnsCount()]; - - HashMap<String,Column> colNames = new HashMap<String, Column>(); - - for (Column fromCol: getFromSchema().getColumnsArray()) { - colNames.put(fromCol.getName(), fromCol); + if (getToSchema().isEmpty()) { + // No destination schema found. No need to convert anything. + return fields; } - int toIndex = 0; - - for (Column toCol: getToSchema().getColumnsArray()) { - Column fromCol = colNames.get(toCol.getName()); + Object[] out = new Object[getToSchema().getColumnsCount()]; + int i = 0; - if (fromCol != null) { - int fromIndex = getFromSchema().getColumnsList().indexOf(fromCol); - if (isNull(fields[fromIndex])) { - out[toIndex] = null; - } else { - out[toIndex] = fields[fromIndex]; - } - } else { - //column exists in TO schema but not in FROM schema - if (toCol.getNullable() == false) { - throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + toCol + " didn't match with any source column and cannot be null"); - } else { - LOG.warn("Column " + toCol + " has no matching source column. Will be ignored. "); - out[toIndex] = null; + for (Column toCol : getToSchema().getColumnsList()) { + boolean assigned = false; + if (fromColNameIndexMap.containsKey(toCol.getName())) { + int fromIndex = fromColNameIndexMap.get(toCol.getName()); + if (fromIndex < fields.length) { + Object value = fields[fromIndex]; + out[i] = isNull(value) ? null : value; + assigned = true; } } - - toIndex++; + if (!assigned) { + tryFillNullInArrayForUnexpectedColumn(toCol, out, i); + } + i++; } - - return out; + return out; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java new file mode 100644 index 0000000..f20b851 --- /dev/null +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.matcher; + +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.DateTime; +import org.apache.sqoop.schema.type.Text; +import org.joda.time.LocalDateTime; + +class SchemaFixture { + + public static Schema createSchema1(String name) { + Schema schema = new Schema(name); + schema.addColumn(new Text("text1")); + schema.addColumn(new DateTime("datetime1", false, false)); + return schema; + } + + public static Object[] createNotNullRecordForSchema1() { + Object[] fields = new Object[2]; + fields[0] = "some text"; + fields[1] = new LocalDateTime("2015-01-01"); + return fields; + } + + public static Schema createSchema(String name, String[] columnNames) { + Schema schema = new Schema(name); + for (String columnName : columnNames) { + if (columnName.startsWith("datetime")) { + schema.addColumn(new DateTime(columnName, false, false)); + } else { + schema.addColumn(new Text(columnName)); + } + } + return schema; + } + + public static Schema createSchema(String name, int numOfColumn) { + Schema schema = new Schema(name); + for (int i = 0; i < numOfColumn; i++) { + schema.addColumn(new Text("text" + i)); + } + return schema; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java new file mode 100644 index 0000000..edf5fd9 --- /dev/null +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.matcher; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.schema.NullSchema; +import org.apache.sqoop.schema.Schema; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; + +public class TestLocationMatcher { + + private LocationMatcher matcher; + + @Before + public void setUp() { + matcher = null; + } + + /** + * FROM and TO schemas are identical, fields should be copied directly. + */ + @Test + public void testPerfectMatch() { + matcher = new LocationMatcher( + SchemaFixture.createSchema1("from"), + SchemaFixture.createSchema1("to")); + Object[] fields = SchemaFixture.createNotNullRecordForSchema1(); + + Object[] actual = matcher.getMatchingData(fields); + assertArrayEquals(fields, actual); + } + + /** + * When no FROM schema is specified, fields should be copied directly. + */ + @Test + public void testDirectFieldsCopy() { + matcher = new LocationMatcher( + NullSchema.getInstance(), + SchemaFixture.createSchema1("to")); + Object[] fields = SchemaFixture.createNotNullRecordForSchema1(); + + Object[] actual = matcher.getMatchingData(fields); + assertArrayEquals(fields, actual); + } + + /** + * If a field contains any "nullable" value, it should be converted to null. + */ + @Test + public void testNullableFieldConvert() { + matcher = new LocationMatcher( + SchemaFixture.createSchema("from", 5), + SchemaFixture.createSchema("to", 5)); + Object[] fields = new Object[] {null, "NULL", "null", "'null'", ""}; + + Object[] actual = matcher.getMatchingData(fields); + assertArrayEquals(new Object[] {null, null, null, null, null}, actual); + } + + /** + * If TO schema has more fields than FROM schema, and all of the extra fields + * are "nullable", their values will be set to null. + */ + @Test + public void testConvertWhenToSchemaIsLongerThanFromSchema() { + matcher = new LocationMatcher( + SchemaFixture.createSchema("from", 2), + SchemaFixture.createSchema("to", 3)); + Object[] fields = new Object[] {"t1", "t2"}; + + Object[] actual = matcher.getMatchingData(fields); + assertArrayEquals(new Object[] {"t1", "t2", null}, actual); + } + + /** + * If TO schema has more fields than FROM schema, and NOT all of the extra + * fields are "nullable", a SqoopException is expected. + */ + @Test (expected = SqoopException.class) + public void testConvertWhenToSchemaIsLongerThanFromSchemaFail() { + Schema from = SchemaFixture.createSchema("from", 2); + Schema to = SchemaFixture.createSchema("to", 4); + to.getColumnsList().get(2).setNullable(true); + to.getColumnsList().get(3).setNullable(false); + matcher = new LocationMatcher(from, to); + Object[] fields = new Object[] {"t1", "t2"}; + + matcher.getMatchingData(fields); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java new file mode 100644 index 0000000..8d5d720 --- /dev/null +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.matcher; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.schema.NullSchema; +import org.apache.sqoop.schema.Schema; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; + +public class TestNameMatcher { + + private NameMatcher matcher; + + @Before + public void setUp() { + matcher = null; + } + + /** + * FROM and TO schemas are identical, fields should be copied directly. + */ + @Test + public void testPerfectMatch() { + matcher = new NameMatcher( + SchemaFixture.createSchema1("from"), + SchemaFixture.createSchema1("to")); + Object[] fields = SchemaFixture.createNotNullRecordForSchema1(); + + Object[] actual = matcher.getMatchingData(fields); + assertArrayEquals(fields, actual); + } + + /** + * When no FROM schema is specified, fields should be copied directly. + */ + @Test + public void testDirectFieldsCopy() { + matcher = new NameMatcher( + NullSchema.getInstance(), + SchemaFixture.createSchema1("to")); + Object[] fields = SchemaFixture.createNotNullRecordForSchema1(); + + Object[] actual = matcher.getMatchingData(fields); + assertArrayEquals(fields, actual); + } + + /** + * If a field contains any "nullable" value, it should be converted to null. + */ + @Test + public void testNullableFieldConvert() { + matcher = new NameMatcher( + SchemaFixture.createSchema("from", + new String[]{"text1", "text2", "text3", "text4", "text5"}), + SchemaFixture.createSchema("to", + new String[]{"text5", "text4", "text2", "text3", "text1"})); + Object[] fields = new Object[] {null, "NULL", "null", "'null'", ""}; + + Object[] actual = matcher.getMatchingData(fields); + assertArrayEquals(new Object[] {null, null, null, null, null}, actual); + } + + /** + * If TO schema has more fields than FROM schema, and all of the extra fields + * are "nullable", their values will be set to null. + */ + @Test + public void testConvertWhenToSchemaIsLongerThanFromSchema() { + matcher = new NameMatcher( + SchemaFixture.createSchema("from", + new String[]{"text1", "text2"}), + SchemaFixture.createSchema("to", + new String[]{"text3", "text1", "text2"})); + Object[] fields = new Object[] {"t1", "t2"}; + + Object[] actual = matcher.getMatchingData(fields); + assertArrayEquals(new Object[] {null, "t1", "t2"}, actual); + } + + /** + * If TO schema has more fields than FROM schema, and NOT all of the extra + * fields are "nullable", a SqoopException is expected. + */ + @Test (expected = SqoopException.class) + public void testConvertWhenToSchemaIsLongerThanFromSchemaFail() { + Schema from = SchemaFixture.createSchema("from", + new String[]{"text1", "text2"}); + Schema to = SchemaFixture.createSchema("to", + new String[]{"text4", "text3", "text2", "text1"}); + to.getColumnsList().get(0).setNullable(true); + to.getColumnsList().get(1).setNullable(false); + matcher = new NameMatcher(from, to); + Object[] fields = new Object[] {"t1", "t2"}; + + matcher.getMatchingData(fields); + } + +} \ No newline at end of file
