Repository: sqoop Updated Branches: refs/heads/trunk 3ccb8f8ff -> 46f9e2d9d
SQOOP-3178: Incremental Merging for Parquet File Format (Sandish Kumar HN via Anna Szonyi) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/46f9e2d9 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/46f9e2d9 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/46f9e2d9 Branch: refs/heads/trunk Commit: 46f9e2d9d8a6d65b9363ef48f4357f4cb039ea8d Parents: 3ccb8f8 Author: Anna Szonyi <[email protected]> Authored: Fri Jul 21 12:23:58 2017 -0700 Committer: Anna Szonyi <[email protected]> Committed: Fri Jul 21 12:23:58 2017 -0700 ---------------------------------------------------------------------- .../MergeGenericRecordExportMapper.java | 75 +++++++++++++++++ .../org/apache/sqoop/mapreduce/MergeJob.java | 70 +++++++++++++++- .../sqoop/mapreduce/MergeParquetMapper.java | 88 ++++++++++++++++++++ .../sqoop/mapreduce/MergeParquetReducer.java | 75 +++++++++++++++++ src/java/org/apache/sqoop/tool/ImportTool.java | 8 +- src/test/com/cloudera/sqoop/TestMerge.java | 37 +++++++- 6 files changed, 347 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/46f9e2d9/src/java/org/apache/sqoop/mapreduce/MergeGenericRecordExportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/MergeGenericRecordExportMapper.java b/src/java/org/apache/sqoop/mapreduce/MergeGenericRecordExportMapper.java new file mode 100644 index 0000000..31d56a5 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/MergeGenericRecordExportMapper.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import com.cloudera.sqoop.lib.SqoopRecord; + +public class MergeGenericRecordExportMapper<K, V> + extends AutoProgressMapper<K, V, Text, MergeRecord> { + + protected MapWritable columnTypes = new MapWritable(); + private String keyColName; + private boolean isNewDatasetSplit; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + keyColName = conf.get(MergeJob.MERGE_KEY_COL_KEY); + + InputSplit inputSplit = context.getInputSplit(); + FileSplit fileSplit = (FileSplit) inputSplit; + Path splitPath = fileSplit.getPath(); + + if (splitPath.toString().startsWith(conf.get(MergeJob.MERGE_NEW_PATH_KEY))) { + this.isNewDatasetSplit = true; + } else if (splitPath.toString().startsWith(conf.get(MergeJob.MERGE_OLD_PATH_KEY))) { + this.isNewDatasetSplit = false; + } else { + throw new IOException( + "File " + splitPath + " is not under new path " + conf.get(MergeJob.MERGE_NEW_PATH_KEY) + + " or old path " + conf.get(MergeJob.MERGE_OLD_PATH_KEY)); + } + super.setup(context); + } + + protected void processRecord(SqoopRecord sqoopRecord, Context context) throws IOException, InterruptedException { + MergeRecord mergeRecord = new MergeRecord(sqoopRecord, isNewDatasetSplit); + Map<String, Object> fieldMap = sqoopRecord.getFieldMap(); + if (null == fieldMap) { + throw new IOException("No field map in record " + sqoopRecord); + } + Object keyObj = fieldMap.get(keyColName); + if (null == keyObj) { + throw new IOException( + "Cannot join values on null key. " + "Did you specify a key column that exists?"); + } else { + context.write(new Text(keyObj.toString()), mergeRecord); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/46f9e2d9/src/java/org/apache/sqoop/mapreduce/MergeJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java index 8b1cba3..c6be189 100644 --- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java +++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java @@ -19,19 +19,21 @@ package org.apache.sqoop.mapreduce; import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.FileReader; -import org.apache.avro.file.SeekableInput; -import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; import org.apache.avro.mapred.FsInput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -43,6 +45,16 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.sqoop.avro.AvroUtil; import org.apache.sqoop.mapreduce.ExportJobBase.FileType; import org.apache.sqoop.util.Jars; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.Datasets; +import org.kitesdk.data.Formats; +import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; +import parquet.avro.AvroParquetInputFormat; +import parquet.avro.AvroSchemaConverter; +import parquet.hadoop.Footer; +import parquet.hadoop.ParquetFileReader; +import parquet.schema.MessageType; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.mapreduce.JobBase; @@ -67,6 +79,8 @@ public class MergeJob extends JobBase { */ public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class"; + public static final String PARQUET_AVRO_SCHEMA = "parquetjob.avro.schema"; + public MergeJob(final SqoopOptions opts) { super(opts, null, null, null); } @@ -130,6 +144,11 @@ public class MergeJob extends JobBase { FileType fileType = ExportJobBase.getFileType(jobConf, oldPath); switch (fileType) { + case PARQUET_FILE: + Path finalPath = new Path(options.getTargetDir()); + finalPath = FileSystemUtil.makeQualified(finalPath, jobConf); + configueParquetMergeJob(jobConf, job, oldPath, newPath, finalPath); + break; case AVRO_DATA_FILE: configueAvroMergeJob(conf, job, oldPath, newPath); break; @@ -179,6 +198,51 @@ public class MergeJob extends JobBase { job.setReducerClass(MergeAvroReducer.class); AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema); } + + private void configueParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, + Path finalPath) throws IOException { + try { + FileSystem fileSystem = finalPath.getFileSystem(conf); + LOG.info("Trying to merge parquet files"); + job.setOutputKeyClass(org.apache.avro.generic.GenericRecord.class); + job.setMapperClass(MergeParquetMapper.class); + job.setReducerClass(MergeParquetReducer.class); + job.setOutputValueClass(NullWritable.class); + + List<Footer> footers = new ArrayList<Footer>(); + FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath); + FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath); + footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true)); + footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true)); + + MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema(); + AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(); + Schema avroSchema = avroSchemaConverter.convert(schema); + + if (!fileSystem.exists(finalPath)) { + Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath); + DatasetKeyOutputFormat.configure(job).overwrite(dataset); + } else { + DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath)); + } + + job.setInputFormatClass(AvroParquetInputFormat.class); + AvroParquetInputFormat.setAvroReadSchema(job, avroSchema); + + conf.set(PARQUET_AVRO_SCHEMA, avroSchema.toString()); + Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class; + + job.setOutputFormatClass(outClass); + } catch (Exception cnfe) { + throw new IOException(cnfe); + } + } + + public static Dataset createDataset(Schema schema, String uri) { + DatasetDescriptor descriptor = + new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build(); + return Datasets.create(uri, descriptor, GenericRecord.class); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/46f9e2d9/src/java/org/apache/sqoop/mapreduce/MergeParquetMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/MergeParquetMapper.java b/src/java/org/apache/sqoop/mapreduce/MergeParquetMapper.java new file mode 100644 index 0000000..8a5a7ca --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/MergeParquetMapper.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.avro.mapred.Pair; + +import org.apache.sqoop.avro.AvroUtil; +import com.cloudera.sqoop.lib.SqoopRecord; + + +public class MergeParquetMapper + extends MergeGenericRecordExportMapper<GenericRecord, GenericRecord> { + + private Map<String, Pair<String, String>> sqoopRecordFields = new HashMap<String, Pair<String, String>>(); + private SqoopRecord sqoopRecordImpl; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + final String userClassName = conf.get(MergeJob.MERGE_SQOOP_RECORD_KEY); + try { + final Class<? extends Object> clazz = Class.forName(userClassName, true, + Thread.currentThread().getContextClassLoader()); + sqoopRecordImpl = (SqoopRecord) ReflectionUtils.newInstance(clazz, conf); + for (final Field field : clazz.getDeclaredFields()) { + final String fieldName = field.getName(); + final String fieldTypeName = field.getType().getName(); + sqoopRecordFields.put(fieldName.toLowerCase(), new Pair<String, String>(fieldName, + fieldTypeName)); + } + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find the user record class with class name" + + userClassName, e); + } + } + + @Override + protected void map(GenericRecord key, GenericRecord val, Context context) + throws IOException, InterruptedException { + processRecord(toSqoopRecord(val), context); + } + + private SqoopRecord toSqoopRecord(GenericRecord genericRecord) throws IOException { + Schema avroSchema = genericRecord.getSchema(); + for (Schema.Field field : avroSchema.getFields()) { + Pair<String, String> sqoopRecordField = sqoopRecordFields.get(field.name().toLowerCase()); + if (null == sqoopRecordField) { + throw new IOException("Cannot find field '" + field.name() + "' in fields of user class" + + sqoopRecordImpl.getClass().getName() + ". Fields are: " + + Arrays.deepToString(sqoopRecordFields.values().toArray())); + } + Object avroObject = genericRecord.get(field.name()); + Object fieldVal = AvroUtil.fromAvro(avroObject, field.schema(), sqoopRecordField.value()); + sqoopRecordImpl.setField(sqoopRecordField.key(), fieldVal); + } + return sqoopRecordImpl; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/46f9e2d9/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java new file mode 100644 index 0000000..293ffc9 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java @@ -0,0 +1,75 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.sqoop.avro.AvroUtil; + +import com.cloudera.sqoop.lib.SqoopRecord; + + +public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord,NullWritable> { + + private Schema schema = null; + private boolean bigDecimalFormatString = true; + private Map<String, Pair<String, String>> sqoopRecordFields = new HashMap<String, Pair<String, String>>(); + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + schema = new Schema.Parser().parse(context.getConfiguration().get("parquetjob.avro.schema")); + bigDecimalFormatString = context.getConfiguration().getBoolean( + ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT); + } + + @Override + public void reduce(Text key, Iterable<MergeRecord> vals, Context context) + throws IOException, InterruptedException { + SqoopRecord bestRecord = null; + try { + for (MergeRecord mergeRecord : vals) { + if (null == bestRecord && !mergeRecord.isNewRecord()) { + // Use an old record if we don't have a new record. + bestRecord = (SqoopRecord) mergeRecord.getSqoopRecord().clone(); + } else if (mergeRecord.isNewRecord()) { + bestRecord = (SqoopRecord) mergeRecord.getSqoopRecord().clone(); + } + } + } catch (CloneNotSupportedException cnse) { + throw new IOException(cnse); + } + + if (null != bestRecord) { + GenericRecord outKey = AvroUtil.toGenericRecord(bestRecord.getFieldMap(), schema, + bigDecimalFormatString); + context.write(outKey, null); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/46f9e2d9/src/java/org/apache/sqoop/tool/ImportTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index 78c7758..807ec8c 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -48,6 +48,7 @@ import com.cloudera.sqoop.mapreduce.MergeJob; import com.cloudera.sqoop.metastore.JobData; import com.cloudera.sqoop.metastore.JobStorage; import com.cloudera.sqoop.metastore.JobStorageFactory; +import org.apache.sqoop.orm.ClassWriter; import com.cloudera.sqoop.orm.TableClassName; import com.cloudera.sqoop.util.AppendUtils; import com.cloudera.sqoop.util.ClassLoaderStack; @@ -460,7 +461,12 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { options.setTargetDir(destDir.toString()); // Local job tracker needs jars in the classpath. - loadJars(options.getConf(), context.getJarFile(), context.getTableName()); + if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) { + loadJars(options.getConf(), context.getJarFile(), ClassWriter.toJavaIdentifier("codegen_" + + context.getTableName())); + } else { + loadJars(options.getConf(), context.getJarFile(), context.getTableName()); + } MergeJob mergeJob = new MergeJob(options); if (mergeJob.runMergeJob()) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/46f9e2d9/src/test/com/cloudera/sqoop/TestMerge.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestMerge.java b/src/test/com/cloudera/sqoop/TestMerge.java index 114e934..9639f84 100644 --- a/src/test/com/cloudera/sqoop/TestMerge.java +++ b/src/test/com/cloudera/sqoop/TestMerge.java @@ -54,7 +54,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Test; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetReader; +import org.kitesdk.data.Datasets; +import static org.apache.avro.generic.GenericData.Record; import static org.junit.Assert.fail; /** @@ -96,7 +100,7 @@ public class TestMerge extends BaseSqoopTestCase { } public static final String TABLE_NAME = "MergeTable"; - private static final String OLD_PATH = "merge-old"; + private static final String OLD_PATH = "merge_old"; private static final String NEW_PATH = "merge_new"; private static final String FINAL_PATH = "merge_final"; @@ -159,6 +163,11 @@ public class TestMerge extends BaseSqoopTestCase { runMergeTest(SqoopOptions.FileLayout.AvroDataFile); } + @Test + public void testParquetFileMerge() throws Exception { + runMergeTest(SqoopOptions.FileLayout.ParquetFile); + } + public void runMergeTest(SqoopOptions.FileLayout fileLayout) throws Exception { createTable(initRecords); @@ -293,6 +302,27 @@ public class TestMerge extends BaseSqoopTestCase { return false; } + private boolean checkParquetFileForLine(FileSystem fileSystem, Path path, List<Integer> record) throws IOException + { + Dataset<Record> parquetRecords = Datasets.load("dataset:" + path.getParent(), Record.class); + DatasetReader<Record> datasetReader = null; + try { + datasetReader = parquetRecords.newReader(); + for (GenericRecord genericRecord : datasetReader) { + if (valueMatches(genericRecord, record)) { + return true; + } + } + } + finally { + if (datasetReader != null) { + datasetReader.close(); + } + } + + return false; + } + protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayout fileLayout, List<Integer> record) throws IOException { boolean result = false; @@ -303,6 +333,9 @@ public class TestMerge extends BaseSqoopTestCase { case AvroDataFile: result = checkAvroFileForLine(fs, p, record); break; + case ParquetFile: + result = checkParquetFileForLine(fs, p, record); + break; } return result; } @@ -326,7 +359,7 @@ public class TestMerge extends BaseSqoopTestCase { for (FileStatus stat : files) { Path p = stat.getPath(); - if (p.getName().startsWith("part-")) { + if (p.getName().startsWith("part-") || p.getName().endsWith(".parquet")) { if (checkFileForLine(fs, p, fileLayout, record)) { // We found the line. Nothing further to do. return true;
