Repository: sqoop Updated Branches: refs/heads/trunk d006bc751 -> c0b9bc435
SQOOP-3068: Enhance error (tool.ImportTool: Encountered IOException running import job: java.io.IOException: Expected schema) to suggest workaround (--map-column-java) (Szabolcs Vasas via Attila Szabo) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c0b9bc43 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c0b9bc43 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c0b9bc43 Branch: refs/heads/trunk Commit: c0b9bc435396600b7b8798190c67ce77aa81a02c Parents: d006bc7 Author: Attila Szabo <[email protected]> Authored: Wed Jan 11 12:50:28 2017 +0100 Committer: Attila Szabo <[email protected]> Committed: Wed Jan 11 12:50:28 2017 +0100 ---------------------------------------------------------------------- .../sqoop/avro/AvroSchemaMismatchException.java | 50 +++++++ .../org/apache/sqoop/mapreduce/ParquetJob.java | 36 ++++- src/java/org/apache/sqoop/tool/ImportTool.java | 21 ++- .../com/cloudera/sqoop/hive/TestHiveImport.java | 43 ++++++ .../org/apache/sqoop/tool/TestImportTool.java | 43 ++++++ .../apache/sqoop/util/ExpectedLogMessage.java | 147 +++++++++++++++++++ 6 files changed, 329 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b9bc43/src/java/org/apache/sqoop/avro/AvroSchemaMismatchException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/avro/AvroSchemaMismatchException.java b/src/java/org/apache/sqoop/avro/AvroSchemaMismatchException.java new file mode 100644 index 0000000..4070627 --- /dev/null +++ b/src/java/org/apache/sqoop/avro/AvroSchemaMismatchException.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.avro; + +import org.apache.avro.Schema; + +/** + * This exception will be thrown when Sqoop tries to write to a dataset + * and the Avro schema which was used when the dataset was created does not match + * the actual schema which is used by Sqoop during the write operation. + */ +public class AvroSchemaMismatchException extends RuntimeException { + + static final String MESSAGE_TEMPLATE = "%s%nExpected schema: %s%nActual schema: %s"; + + private final Schema writtenWithSchema; + + private final Schema actualSchema; + + public AvroSchemaMismatchException(String message, Schema writtenWithSchema, Schema actualSchema) { + super(String.format(MESSAGE_TEMPLATE, message, writtenWithSchema.toString(), actualSchema.toString())); + this.writtenWithSchema = writtenWithSchema; + this.actualSchema = actualSchema; + } + + public Schema getWrittenWithSchema() { + return writtenWithSchema; + } + + public Schema getActualSchema() { + return actualSchema; + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b9bc43/src/java/org/apache/sqoop/mapreduce/ParquetJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java index b077d9b..4604773 100644 --- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java +++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java @@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.sqoop.avro.AvroSchemaMismatchException; import org.apache.sqoop.hive.HiveConfig; import org.kitesdk.data.CompressionType; import org.kitesdk.data.Dataset; @@ -48,12 +49,23 @@ public final class ParquetJob { public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName()); public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"; + public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled"; // Purposefully choosing the same token alias as the one Oozie chooses. // Make sure we don't generate a new delegation token if oozie // has already generated one. public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token"; + public static final String INCOMPATIBLE_AVRO_SCHEMA_MSG = "Target dataset was created with an incompatible Avro schema. "; + + public static final String HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG = "You tried to import to an already existing Hive table in " + + "Parquet format. Sqoop maps date/timestamp SQL types to int/bigint Hive types during Hive Parquet import" + + " but it is possible that date/timestamp types were mapped to strings during table" + + " creation. Consider using Sqoop option --map-column-java resolve the mismatch" + + " (e.g. --map-column-java date_field1=String,timestamp_field1=String)."; + + private static final String HIVE_URI_PREFIX = "dataset:hive"; + private ParquetJob() { } @@ -91,7 +103,7 @@ public final class ParquetJob { Dataset dataset; // Add hive delegation token only if we don't already have one. - if (uri.startsWith("dataset:hive")) { + if (isHiveImport(uri)) { Configuration hiveConf = HiveConfig.getHiveConf(conf); if (isSecureMetastore(hiveConf)) { // Copy hive configs to job config @@ -111,9 +123,8 @@ public final class ParquetJob { dataset = Datasets.load(uri); Schema writtenWith = dataset.getDescriptor().getSchema(); if (!SchemaValidationUtil.canRead(writtenWith, schema)) { - throw new IOException( - String.format("Expected schema: %s%nActual schema: %s", - writtenWith, schema)); + String exceptionMessage = buildAvroSchemaMismatchMessage(isHiveImport(uri)); + throw new AvroSchemaMismatchException(exceptionMessage, writtenWith, schema); } } else { dataset = createDataset(schema, getCompressionType(conf), uri); @@ -131,7 +142,11 @@ public final class ParquetJob { } } - private static Dataset createDataset(Schema schema, + private static boolean isHiveImport(String importUri) { + return importUri.startsWith(HIVE_URI_PREFIX); + } + + public static Dataset createDataset(Schema schema, CompressionType compressionType, String uri) { DatasetDescriptor descriptor = new DatasetDescriptor.Builder() .schema(schema) @@ -191,4 +206,15 @@ public final class ParquetJob { throw new RuntimeException("Couldn't fetch delegation token.", ex); } } + + private static String buildAvroSchemaMismatchMessage(boolean hiveImport) { + String exceptionMessage = INCOMPATIBLE_AVRO_SCHEMA_MSG; + + if (hiveImport) { + exceptionMessage += HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG; + } + + return exceptionMessage; + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b9bc43/src/java/org/apache/sqoop/tool/ImportTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index ed951ea..258ef79 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -52,7 +52,7 @@ import com.cloudera.sqoop.metastore.JobStorage; import com.cloudera.sqoop.metastore.JobStorageFactory; import com.cloudera.sqoop.util.AppendUtils; import com.cloudera.sqoop.util.ImportException; -import org.apache.sqoop.manager.SupportedManagers; +import org.apache.sqoop.avro.AvroSchemaMismatchException; import static org.apache.sqoop.manager.SupportedManagers.MYSQL; @@ -63,6 +63,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { public static final Log LOG = LogFactory.getLog(ImportTool.class.getName()); + private static final String IMPORT_FAILED_ERROR_MSG = "Import failed: "; + private CodeGenTool codeGenerator; // true if this is an all-tables import. Set by a subclass which @@ -81,8 +83,12 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { } public ImportTool(String toolName, boolean allTables) { + this(toolName, new CodeGenTool(), allTables); + } + + public ImportTool(String toolName, CodeGenTool codeGenerator, boolean allTables) { super(toolName); - this.codeGenerator = new CodeGenTool(); + this.codeGenerator = codeGenerator; this.allTables = allTables; } @@ -616,18 +622,21 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { // Import a single table (or query) the user specified. importTable(options, options.getTableName(), hiveImport); } catch (IllegalArgumentException iea) { - LOG.error("Imported Failed: " + iea.getMessage()); + LOG.error(IMPORT_FAILED_ERROR_MSG + iea.getMessage()); rethrowIfRequired(options, iea); return 1; } catch (IOException ioe) { - LOG.error("Encountered IOException running import job: " - + StringUtils.stringifyException(ioe)); + LOG.error(IMPORT_FAILED_ERROR_MSG + StringUtils.stringifyException(ioe)); rethrowIfRequired(options, ioe); return 1; } catch (ImportException ie) { - LOG.error("Error during import: " + ie.toString()); + LOG.error(IMPORT_FAILED_ERROR_MSG + ie.toString()); rethrowIfRequired(options, ie); return 1; + } catch (AvroSchemaMismatchException e) { + LOG.error(IMPORT_FAILED_ERROR_MSG, e); + rethrowIfRequired(options, e); + return 1; } finally { destroy(options); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b9bc43/src/test/com/cloudera/sqoop/hive/TestHiveImport.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java index 6f488ab..1253e8d 100644 --- a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java +++ b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java @@ -26,13 +26,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import com.cloudera.sqoop.Sqoop; import junit.framework.JUnit4TestAdapter; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.sqoop.avro.AvroSchemaMismatchException; +import org.apache.sqoop.mapreduce.ParquetJob; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -54,6 +59,8 @@ import org.junit.runners.JUnit4; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetReader; import org.kitesdk.data.Datasets; +import org.kitesdk.data.Formats; +import org.kitesdk.data.spi.DefaultConfiguration; /** * Test HiveImport capability after an import to HDFS. @@ -388,6 +395,42 @@ public class TestHiveImport extends ImportJobTestCase { verifyHiveDataset(TABLE_NAME, new Object[][] {{"test2", 24, "somestring2"}}); } + @Test + public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throws Exception { + final String TABLE_NAME = "HIVE_IMPORT_AS_PARQUET_EXISTING_TABLE"; + setCurTableName(TABLE_NAME); + setNumCols(3); + + String [] types = { "VARCHAR(32)", "INTEGER", "DATE" }; + String [] vals = { "'test'", "42", "'2009-12-31'" }; + String [] extraArgs = {"--as-parquetfile"}; + + createHiveDataSet(TABLE_NAME); + + createTableWithColTypes(types, vals); + + thrown.expect(AvroSchemaMismatchException.class); + thrown.expectMessage(ParquetJob.INCOMPATIBLE_AVRO_SCHEMA_MSG + ParquetJob.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG); + + SqoopOptions sqoopOptions = getSqoopOptions(getConf()); + sqoopOptions.setThrowOnError(true); + Sqoop sqoop = new Sqoop(new ImportTool(), getConf(), sqoopOptions); + sqoop.run(getArgv(false, extraArgs)); + + } + + private void createHiveDataSet(String tableName) { + Schema dataSetSchema = SchemaBuilder + .record(tableName) + .fields() + .name(getColName(0)).type().nullable().stringType().noDefault() + .name(getColName(1)).type().nullable().stringType().noDefault() + .name(getColName(2)).type().nullable().stringType().noDefault() + .endRecord(); + String dataSetUri = "dataset:hive:/default/" + tableName; + ParquetJob.createDataset(dataSetSchema, Formats.PARQUET.getDefaultCompressionType(), dataSetUri); + } + /** * Test that records are appended to an existing table. */ http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b9bc43/src/test/org/apache/sqoop/tool/TestImportTool.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/tool/TestImportTool.java b/src/test/org/apache/sqoop/tool/TestImportTool.java index 4136e9f..7e11f54 100644 --- a/src/test/org/apache/sqoop/tool/TestImportTool.java +++ b/src/test/org/apache/sqoop/tool/TestImportTool.java @@ -20,11 +20,25 @@ package org.apache.sqoop.tool; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.sql.Connection; +import com.cloudera.sqoop.hive.HiveImport; +import org.apache.avro.Schema; import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.avro.AvroSchemaMismatchException; +import org.apache.sqoop.util.ExpectedLogMessage; +import org.junit.Rule; +import org.junit.Test; import org.junit.experimental.theories.DataPoints; import org.junit.experimental.theories.Theories; import org.junit.experimental.theories.Theory; @@ -41,6 +55,9 @@ public class TestImportTool { {"TRANSACTION_SERIALIZABLE",Connection.TRANSACTION_SERIALIZABLE} }; + @Rule + public ExpectedLogMessage logMessage = new ExpectedLogMessage(); + @Theory public void esnureTransactionIsolationLevelsAreMappedToTheRightValues(Object[] values) throws Exception { @@ -50,4 +67,30 @@ public class TestImportTool { assertThat(options.getMetadataTransactionIsolationLevel(), is(equalTo(values[1]))); } + @Test + public void testImportToolHandlesAvroSchemaMismatchExceptionProperly() throws Exception { + final String writtenWithSchemaString = "writtenWithSchema"; + final String actualSchemaString = "actualSchema"; + final String errorMessage = "Import failed"; + + ImportTool importTool = spy(new ImportTool("import", mock(CodeGenTool.class), false)); + + doReturn(true).when(importTool).init(any(com.cloudera.sqoop.SqoopOptions.class)); + + Schema writtenWithSchema = mock(Schema.class); + when(writtenWithSchema.toString()).thenReturn(writtenWithSchemaString); + Schema actualSchema = mock(Schema.class); + when(actualSchema.toString()).thenReturn(actualSchemaString); + + AvroSchemaMismatchException expectedException = new AvroSchemaMismatchException(errorMessage, writtenWithSchema, actualSchema); + doThrow(expectedException).when(importTool).importTable(any(com.cloudera.sqoop.SqoopOptions.class), anyString(), any(HiveImport.class)); + + com.cloudera.sqoop.SqoopOptions sqoopOptions = mock(com.cloudera.sqoop.SqoopOptions.class); + when(sqoopOptions.doHiveImport()).thenReturn(true); + + logMessage.expectError(expectedException.getMessage()); + int result = importTool.run(sqoopOptions); + assertEquals(1, result); + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b9bc43/src/test/org/apache/sqoop/util/ExpectedLogMessage.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/util/ExpectedLogMessage.java b/src/test/org/apache/sqoop/util/ExpectedLogMessage.java new file mode 100644 index 0000000..0372fe2 --- /dev/null +++ b/src/test/org/apache/sqoop/util/ExpectedLogMessage.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.apache.log4j.spi.ThrowableInformation; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; +import org.mockito.ArgumentCaptor; + +import static org.apache.commons.lang.StringUtils.EMPTY; +import static org.apache.commons.lang.StringUtils.contains; +import static org.apache.commons.lang.StringUtils.defaultString; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class ExpectedLogMessage implements TestRule { + + private static class LoggingEventMatcher extends TypeSafeMatcher<LoggingEvent> { + + private final String msg; + + private final Level level; + + private LoggingEventMatcher(String msg, Level level) { + this.msg = msg; + this.level = level; + } + + @Override + public boolean matchesSafely(LoggingEvent o) { + return contains(extractEventMessage(o), msg) && level.equals(o.getLevel()); + } + + @Override + public void describeTo(org.hamcrest.Description description) { + description.appendText(eventToString(msg, level)); + } + + @Override + protected void describeMismatchSafely(LoggingEvent item, org.hamcrest.Description mismatchDescription) { + mismatchDescription.appendText(eventToString(extractEventMessage(item), item.getLevel())); + } + + private String extractEventMessage(LoggingEvent item) { + final String eventMsg = item.getRenderedMessage(); + final String exceptionMessage = extractExceptionMessage(item.getThrowableInformation()); + + return eventMsg + exceptionMessage; + } + + private String extractExceptionMessage(ThrowableInformation throwableInfo) { + if (throwableInfo == null) { + return EMPTY; + } + + Throwable throwable = throwableInfo.getThrowable(); + if (throwable == null) { + return EMPTY; + } + + return defaultString(throwable.getMessage()); + } + + private String eventToString(String msg, Level level) { + return "Log entry [ " + msg + ", " + level + " ]"; + } + + } + + private Matcher<LoggingEvent> loggingEventMatcher; + + @Override + public Statement apply(final Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + + Logger rootLogger = Logger.getRootLogger(); + Appender mockAppender = mock(Appender.class); + rootLogger.addAppender(mockAppender); + + try { + base.evaluate(); + if (loggingEventMatcher != null) { + ArgumentCaptor<LoggingEvent> argumentCaptor = ArgumentCaptor.forClass(LoggingEvent.class); + verify(mockAppender, atMost(Integer.MAX_VALUE)).doAppend(argumentCaptor.capture()); + assertThat(argumentCaptor.getAllValues(), hasItem(loggingEventMatcher)); + } + } finally { + rootLogger.removeAppender(mockAppender); + loggingEventMatcher = null; + } + } + }; + } + + public void expectFatal(String msg) { + loggingEventMatcher = new LoggingEventMatcher(msg, Level.FATAL); + } + + public void expectError(String msg) { + loggingEventMatcher = new LoggingEventMatcher(msg, Level.ERROR); + } + + public void expectWarn(String msg) { + loggingEventMatcher = new LoggingEventMatcher(msg, Level.WARN); + } + + public void expectInfo(String msg) { + loggingEventMatcher = new LoggingEventMatcher(msg, Level.INFO); + } + + public void expectDebug(String msg) { + loggingEventMatcher = new LoggingEventMatcher(msg, Level.DEBUG); + } + + public void expectTrace(String msg) { + loggingEventMatcher = new LoggingEventMatcher(msg, Level.TRACE); + } + +}
