Repository: sqoop Updated Branches: refs/heads/sqoop2 100810be4 -> 3edf9cad6
SQOOP-1621: Sqoop2: Allow null as a dummy Schema (Veena Basavaraj via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3edf9cad Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3edf9cad Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3edf9cad Branch: refs/heads/sqoop2 Commit: 3edf9cad6f77cbcd9e9baf672f715fd99bedd8c0 Parents: 100810b Author: Abraham Elmahrek <[email protected]> Authored: Wed Nov 12 20:15:45 2014 -0800 Committer: Abraham Elmahrek <[email protected]> Committed: Wed Nov 12 20:15:45 2014 -0800 ---------------------------------------------------------------------- .../sqoop/json/util/SchemaSerialization.java | 30 ++++++++++-------- .../org/apache/sqoop/schema/NullSchema.java | 32 ++++++++++++++++++++ .../json/util/TestSchemaSerialization.java | 23 ++++++++++---- .../connector/hdfs/HdfsFromInitializer.java | 7 ----- .../sqoop/connector/hdfs/HdfsToInitializer.java | 7 ----- docs/src/site/sphinx/ConnectorDevelopment.rst | 12 ++++++-- .../sqoop/job/mr/MRConfigurationUtils.java | 2 -- .../sqoop/job/mr/TestMRConfigurationUtils.java | 10 +++--- .../org/apache/sqoop/job/etl/Initializer.java | 8 +++-- 9 files changed, 87 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/3edf9cad/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java index 9ab87c5..6dc93d5 100644 --- a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java +++ b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.json.util; +import org.apache.sqoop.schema.NullSchema; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.AbstractComplexListType; import org.apache.sqoop.schema.type.AbstractPrimitiveType; @@ -76,31 +77,36 @@ public class SchemaSerialization { @SuppressWarnings("unchecked") public static JSONObject extractSchema(Schema schema) { JSONObject object = new JSONObject(); - object.put(NAME, schema.getName()); - object.put(CREATION_DATE, schema.getCreationDate().getTime()); - if (schema.getNote() != null) { - object.put(NOTE, schema.getNote()); - } - JSONArray columnArray = new JSONArray(); - for (Column column : schema.getColumns()) { - columnArray.add(extractColumn(column)); + // just a defensive check + if (schema != null) { + object.put(NAME, schema.getName()); + object.put(CREATION_DATE, schema.getCreationDate().getTime()); + if (schema.getNote() != null) { + object.put(NOTE, schema.getNote()); + } + JSONArray columnArray = new JSONArray(); + for (Column column : schema.getColumns()) { + columnArray.add(extractColumn(column)); + } + object.put(COLUMNS, columnArray); } - object.put(COLUMNS, columnArray); return object; } public static Schema restoreSchema(JSONObject jsonObject) { + // if the object is empty return a empty schema + if (jsonObject == null || jsonObject.isEmpty()) { + return NullSchema.getInstance(); + } String name = (String) jsonObject.get(NAME); String note = (String) jsonObject.get(NOTE); java.util.Date date = new java.util.Date((Long) jsonObject.get(CREATION_DATE)); Schema schema = new Schema(name).setNote(note).setCreationDate(date); - JSONArray columnsArray = (JSONArray) jsonObject.get(COLUMNS); for (Object obj : columnsArray) { schema.addColumn(restoreColumn((JSONObject) obj)); } - return schema; } @@ -158,7 +164,6 @@ public class SchemaSerialization { break; case DATE: case BIT: - // Nothing to do extra break; default: // TODO(jarcec): Throw an exception of unsupported type? @@ -193,7 +198,6 @@ public class SchemaSerialization { } arraySize = (Long) list.get(SIZE); } - ColumnType type = ColumnType.valueOf((String) obj.get(TYPE)); Column output = null; switch (type) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/3edf9cad/common/src/main/java/org/apache/sqoop/schema/NullSchema.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/schema/NullSchema.java b/common/src/main/java/org/apache/sqoop/schema/NullSchema.java new file mode 100644 index 0000000..c3393a6 --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/schema/NullSchema.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.schema; + +public class NullSchema extends Schema { + + public static final NullSchema instance = new NullSchema(); + + public static NullSchema getInstance() { + return instance; + } + + private NullSchema() { + // empty string is the name + super(""); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3edf9cad/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java b/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java index 1b0f30b..aca67d1 100644 --- a/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java +++ b/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java @@ -17,6 +17,9 @@ */ package org.apache.sqoop.json.util; +import static org.junit.Assert.assertEquals; + +import org.apache.sqoop.schema.NullSchema; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.Array; import org.apache.sqoop.schema.type.Binary; @@ -36,14 +39,25 @@ import org.json.simple.JSONObject; import org.json.simple.JSONValue; import org.junit.Test; -import static org.junit.Assert.assertEquals; - /** * */ public class TestSchemaSerialization { @Test + public void testSchemaNull() { + // a null schema is treated as a NullSchema + JSONObject extractJson = SchemaSerialization.extractSchema(null); + JSONObject restoreJson = (JSONObject) JSONValue.parse(extractJson.toJSONString()); + assertEquals(NullSchema.getInstance(), SchemaSerialization.restoreSchema(restoreJson)); + } + + @Test + public void testNullSchemaObject() { + transferAndAssert(NullSchema.getInstance()); + } + + @Test public void testArray() { // create an array type containing decimals Schema array = new Schema("array").addColumn(new Array("a", new Decimal()).setSize(1L)); @@ -157,8 +171,7 @@ public class TestSchemaSerialization { @Test public void testComplex() { Schema complex = new Schema("complex") - .addColumn(new Map(new Text(), new Set(new Array(new Text()))).setName("a")) - ; + .addColumn(new Map(new Text(), new Set(new Array(new Text()))).setName("a")); transferAndAssert(complex); } @@ -169,9 +182,7 @@ public class TestSchemaSerialization { protected Schema transfer(Schema schema) { JSONObject extractJson = SchemaSerialization.extractSchema(schema); - String transferredString = extractJson.toJSONString(); - JSONObject restoreJson = (JSONObject) JSONValue.parse(transferredString); return SchemaSerialization.restoreSchema(restoreJson); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3edf9cad/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java index 4c6f566..0a95e07 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java @@ -21,7 +21,6 @@ import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; -import org.apache.sqoop.schema.Schema; public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobConfiguration> { @@ -39,10 +38,4 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC FromJobConfiguration jobConfig) { // do nothing at this point } - - @Override - public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, - FromJobConfiguration jobConfig) { - return new Schema("HDFS file"); - } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3edf9cad/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index bce72b5..991e6c9 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -21,7 +21,6 @@ import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; -import org.apache.sqoop.schema.Schema; public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> { /** @@ -38,10 +37,4 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi ToJobConfiguration jobConfig) { // do nothing at this point } - - @Override - public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, - ToJobConfiguration jobConfig) { - return new Schema("HDFS file"); - } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3edf9cad/docs/src/site/sphinx/ConnectorDevelopment.rst ---------------------------------------------------------------------- diff --git a/docs/src/site/sphinx/ConnectorDevelopment.rst b/docs/src/site/sphinx/ConnectorDevelopment.rst index 58b3b61..e740cd5 100644 --- a/docs/src/site/sphinx/ConnectorDevelopment.rst +++ b/docs/src/site/sphinx/ConnectorDevelopment.rst @@ -115,12 +115,18 @@ Initializer is instantiated before the submission of sqoop job to the execution JobConfiguration jobConfiguration); public List<String> getJars(InitializerContext context, LinkConfiguration linkConfiguration, - JobConfiguration jobConfiguration); + JobConfiguration jobConfiguration){ + return new LinkedList<String>(); + } public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration, - JobConfiguration jobConfiguration); + JobConfiguration jobConfiguration) { + return new NullSchema(); + } + +In addition to the initialize() method where the job execution preparation activities occur, the ``Initializer`` can also implement the getSchema() method for the directions ``FROM`` and ``TO`` that it supports. -In addition to the initialize() method where the job execution preparation activities occur, the ``Initializer`` must also implement the getSchema() method for the direction it supports. The getSchema() method is used by the sqoop system to match the data extracted/read by the ``From`` instance of connector data source with the data loaded/written to the ``To`` instance of the connector data source. In case of a relational database or columnar database, the returned Schema object will include collection of columns with their data types. If the data source is schema-less, such as a file, an empty Schema can be returned (i.e a Schema object without any columns). +The getSchema() method is used by the sqoop system to match the data extracted/read by the ``From`` instance of connector data source with the data loaded/written to the ``To`` instance of the connector data source. In case of a relational database or columnar database, the returned Schema object will include collection of columns with their data types. If the data source is schema-less, such as a file, a default ``NullSchema`` will be used (i.e a Schema object without any columns). NOTE: Sqoop 2 currently does not support extract and load between two connectors that represent schema-less data sources. We expect that atleast the ``From`` instance of the connector or the ``To`` instance of the connector in the sqoop job will have a schema. If both ``From`` and ``To`` have a associated non empty schema, Sqoop 2 will load data by column name, i.e, data in column "A" in ``From`` instance of the connector for the job will be loaded to column "A" in the ``To`` instance of the connector for that job. http://git-wip-us.apache.org/repos/asf/sqoop/blob/3edf9cad/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java index d5f74f0..fb10e3c 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java @@ -139,7 +139,6 @@ public final class MRConfigurationUtils { * @param schema Schema */ public static void setConnectorSchema(Direction type, Job job, Schema schema) { - if(schema != null) { String jsonSchema = SchemaSerialization.extractSchema(schema).toJSONString(); switch (type) { case FROM: @@ -148,7 +147,6 @@ public final class MRConfigurationUtils { case TO: job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes()); return; - } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3edf9cad/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java index fbe3e7b..70ea6d4 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java @@ -18,17 +18,17 @@ package org.apache.sqoop.job.mr; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.sqoop.common.Direction; -import org.apache.sqoop.model.ConfigurationClass; import org.apache.sqoop.model.Config; import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.ConfigurationClass; import org.apache.sqoop.model.Input; +import org.apache.sqoop.schema.NullSchema; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.Text; import org.junit.Before; @@ -100,10 +100,12 @@ public class TestMRConfigurationUtils { @Test public void testConnectorSchemaNull() throws Exception { MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, null); - assertNull(MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); + assertEquals(NullSchema.getInstance(), + MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); MRConfigurationUtils.setConnectorSchema(Direction.TO, job, null); - assertNull(MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); + assertEquals(NullSchema.getInstance(), + MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); } private Schema getSchema(String name) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/3edf9cad/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java index d66b099..4dd6d5b 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java @@ -20,6 +20,7 @@ package org.apache.sqoop.job.etl; import java.util.LinkedList; import java.util.List; +import org.apache.sqoop.schema.NullSchema; import org.apache.sqoop.schema.Schema; /** @@ -59,6 +60,7 @@ public abstract class Initializer<LinkConfiguration, JobConfiguration> { /** * Return schema associated with the connector for FROM and TO + * By default we assume a null schema. Override the method if there a custom schema to provide either for FROM or TO * @param context Initializer context object * @param linkConfiguration link configuration object * @param jobConfiguration job configuration object for the FROM and TO @@ -67,7 +69,9 @@ public abstract class Initializer<LinkConfiguration, JobConfiguration> { * @return */ - public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration, - JobConfiguration jobConfiguration); + public Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration, + JobConfiguration jobConfiguration) { + return NullSchema.getInstance(); + } }
