[GOBBLIN-396] add date partitioned based json source Closes #2270 from arjun4084346/jsonDatePartitionedSource
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5c678d9b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5c678d9b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5c678d9b Branch: refs/heads/0.12.0 Commit: 5c678d9b6b008c9fef0eeea731f2bf19e55e1cea Parents: 457ede2 Author: Arjun <[email protected]> Authored: Thu Feb 8 11:40:19 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Feb 8 11:40:19 2018 -0800 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 6 + .../JsonRecordAvroSchemaToAvroConverter.java | 11 +- .../source/DatePartitionedJsonFileSource.java | 39 ++++++ .../source/DatePartitionedNestedRetriever.java | 9 +- .../source/PartitionedFileSourceBase.java | 20 +++- .../source/RegexBasedPartitionedRetriever.java | 9 +- .../DatePartitionedJsonFileExtractor.java | 30 +++++ .../source/extractor/SimpleJsonExtractor.java | 118 +++++++++++++++++++ ...JsonRecordAvroSchemaToAvroConverterTest.java | 5 +- .../filebased/FileBasedSourceTest.java | 23 ++++ .../test/resources/source/2017-12/metadata.json | 1 + .../resources/source/2017-12/simplejson.json | 3 + .../test/resources/source/2018-01/metadata.json | 1 + .../resources/source/2018-01/simplejson.json | 3 + .../resources/source/2018-01/simplejson2.json | 3 + 15 files changed, 268 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 267a17e..d07d740 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -302,6 +302,8 @@ public class ConfigurationKeys { public static final String CONVERTER_AVRO_FIELD_PICK_FIELDS = "converter.avro.fields"; public static final String CONVERTER_AVRO_JDBC_ENTRY_FIELDS_PAIRS = "converter.avro.jdbc.entry_fields_pairs"; public static final String CONVERTER_SKIP_FAILED_RECORD = "converter.skipFailedRecord"; + public static final String CONVERTER_AVRO_SCHEMA_KEY = "converter.avroSchema"; + public static final String CONVERTER_IGNORE_FIELDS = "converter.ignoreFields"; /** * Fork operator configuration properties. @@ -452,6 +454,10 @@ public class ConfigurationKeys { * Configuration properties used by the extractor. */ public static final String SOURCE_ENTITY = "source.entity"; + public static final String SCHEMA_IN_SOURCE_DIR = "schema.in.source.dir"; + public static final boolean DEFAULT_SCHEMA_IN_SOURCE_DIR = false; + public static final String SCHEMA_FILENAME = "schema.filename"; + public static final String DEFAULT_SCHEMA_FILENAME = "metadata.json"; // Comma-separated source entity names public static final String SOURCE_ENTITIES = "source.entities"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java index 11f85f4..8e25975 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.converter.Converter; import org.apache.gobblin.converter.DataConversionException; @@ -42,18 +43,12 @@ import com.google.common.base.Splitter; public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase<SI, JsonObject> { private static final Splitter SPLITTER_ON_COMMA = Splitter.on(',').trimResults().omitEmptyStrings(); - - public static final String AVRO_SCHEMA_KEY = "converter.avroSchema"; - public static final String IGNORE_FIELDS = "converter.ignoreFields"; - private Schema schema; private List<String> ignoreFields; public ToAvroConverterBase<SI, JsonObject> init(WorkUnitState workUnit) { super.init(workUnit); - Preconditions.checkArgument(workUnit.contains(AVRO_SCHEMA_KEY)); - this.schema = new Schema.Parser().parse(workUnit.getProp(AVRO_SCHEMA_KEY)); - this.ignoreFields = SPLITTER_ON_COMMA.splitToList(workUnit.getProp(IGNORE_FIELDS, "")); + this.ignoreFields = SPLITTER_ON_COMMA.splitToList(workUnit.getProp(ConfigurationKeys.CONVERTER_IGNORE_FIELDS, "")); return this; } @@ -62,6 +57,8 @@ public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase */ @Override public Schema convertSchema(SI inputSchema, WorkUnitState workUnit) throws SchemaConversionException { + Preconditions.checkArgument(workUnit.contains(ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY)); + this.schema = new Schema.Parser().parse(workUnit.getProp(ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY)); return this.schema; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java new file mode 100644 index 0000000..cc5167a --- /dev/null +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.source; + +import java.io.IOException; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.source.extractor.DatePartitionedJsonFileExtractor; +import org.apache.gobblin.source.extractor.Extractor; + +import com.google.gson.JsonObject; + +public class DatePartitionedJsonFileSource extends PartitionedFileSourceBase<String, JsonObject> { + + public DatePartitionedJsonFileSource() { + super(new DatePartitionedNestedRetriever(".json")); + } + + @Override + public Extractor<String, JsonObject> getExtractor(WorkUnitState state) + throws IOException { + return new DatePartitionedJsonFileExtractor(state); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java index a9ff257..4c33555 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java @@ -72,6 +72,8 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev private HadoopFsHelper helper; private final String expectedExtension; private Duration leadTimeDuration; + private boolean schemaInSourceDir; + private String schemaFile; public DatePartitionedNestedRetriever(String expectedExtension) { this.expectedExtension = expectedExtension; @@ -91,6 +93,10 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY)); this.leadTimeDuration = PartitionAwareFileRetrieverUtils.getLeadTimeDurationFromConfig(state); this.helper = new HadoopFsHelper(state); + this.schemaInSourceDir = state.getPropAsBoolean(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR, + ConfigurationKeys.DEFAULT_SCHEMA_IN_SOURCE_DIR); + this.schemaFile = this.schemaInSourceDir ? state.getProp(ConfigurationKeys.SCHEMA_FILENAME, + ConfigurationKeys.DEFAULT_SCHEMA_FILENAME) : ""; } @Override @@ -201,7 +207,8 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev return new PathFilter() { @Override public boolean accept(Path path) { - return path.getName().endsWith(extension); + return path.getName().endsWith(extension) && + !(schemaInSourceDir && path.getName().equals(schemaFile)) ; } }; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java index 9ec7707..1b54895 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java @@ -69,7 +69,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS public static final String DATE_PARTITIONED_SOURCE_PARTITION_SUFFIX = DATE_PARTITIONED_SOURCE_PREFIX + ".partition.suffix"; - static final String DATE_PARTITIONED_SOURCE_PARTITION_PATTERN = + public static final String DATE_PARTITIONED_SOURCE_PARTITION_PATTERN = DATE_PARTITIONED_SOURCE_PREFIX + ".partition.pattern"; public static final String DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY = @@ -99,7 +99,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS * If this parameter is not specified the job will start reading data from * the beginning of Unix time. */ - private static final String DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE = + public static final String DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE = DATE_PARTITIONED_SOURCE_PREFIX + ".min.watermark.value"; /** @@ -291,6 +291,11 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch()); singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY, file.getWatermarkMsSinceEpoch()); + if (this.sourceState.getPropAsBoolean(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR, + ConfigurationKeys.DEFAULT_SCHEMA_IN_SOURCE_DIR)) { + addSchemaFile(file, singleWorkUnit); + } + multiWorkUnitWeightedQueue.addWorkUnit(singleWorkUnit, file.getFileSize()); this.fileCount++; @@ -302,6 +307,17 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS } } + private void addSchemaFile(PartitionAwareFileRetriever.FileInfo dataFile, WorkUnit workUnit) + throws IOException { + Path schemaFile = new Path(new Path(dataFile.getFilePath()).getParent(), + workUnit.getProp(ConfigurationKeys.SCHEMA_FILENAME, ConfigurationKeys.DEFAULT_SCHEMA_FILENAME)); + if (fs.exists(schemaFile)) { + workUnit.setProp(ConfigurationKeys.SOURCE_SCHEMA, schemaFile.toString()); + } else { + throw new IOException("Schema file " + schemaFile + " does not exist."); + } + } + /** * Gets the LWM for this job runs. The new LWM is the HWM of the previous run + 1 unit (day,hour,minute..etc). * If there was no previous execution then it is set to the given lowWaterMark + 1 unit. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java index e082bda..7d3ad92 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java @@ -48,6 +48,8 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev private Path sourceDir; private final String expectedExtension; private Duration leadTime; + private boolean schemaInSourceDir; + private String schemaFile; public RegexBasedPartitionedRetriever(String expectedExtension) { this.expectedExtension = expectedExtension; @@ -64,6 +66,10 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev this.pattern = Pattern.compile(regexPattern); this.helper = new HadoopFsHelper(state); this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY)); + this.schemaInSourceDir = state.getPropAsBoolean(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR, + ConfigurationKeys.DEFAULT_SCHEMA_IN_SOURCE_DIR); + this.schemaFile = this.schemaInSourceDir ? state.getProp(ConfigurationKeys.SCHEMA_FILENAME, + ConfigurationKeys.DEFAULT_SCHEMA_FILENAME) : ""; } @Override @@ -175,7 +181,8 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev return new PathFilter() { @Override public boolean accept(Path path) { - return path.getName().endsWith(extension); + return path.getName().endsWith(extension) && + !(schemaInSourceDir && path.getName().equals(schemaFile)) ; } }; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java new file mode 100644 index 0000000..b165a35 --- /dev/null +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.source.extractor; + +import java.io.IOException; + +import org.apache.gobblin.configuration.WorkUnitState; + +public class DatePartitionedJsonFileExtractor extends SimpleJsonExtractor { + + public DatePartitionedJsonFileExtractor(WorkUnitState workUnitState) + throws IOException { + super(workUnitState); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java new file mode 100644 index 0000000..412a06e --- /dev/null +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.source.extractor; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import org.apache.gobblin.source.extractor.hadoop.HadoopFsHelper; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.Closer; +import com.google.gson.JsonObject; +import com.google.gson.Gson; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; + +/** + * An implementation of {@link Extractor}. + * + * <p> + * This extractor reads the assigned input file storing + * json documents confirming to a schema. Each line of the file is a json document. + * </p> + */ +public class SimpleJsonExtractor implements Extractor<String, JsonObject> { + + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleJsonExtractor.class); + private final WorkUnitState workUnitState; + private final FileSystem fs; + private final BufferedReader bufferedReader; + private final Closer closer = Closer.create(); + private static final Gson GSON = new Gson(); + + public SimpleJsonExtractor(WorkUnitState workUnitState) throws IOException { + this.workUnitState = workUnitState; + + HadoopFsHelper fsHelper = new HadoopFsHelper(workUnitState); + try { + fsHelper.connect(); + } catch (Exception e) { + throw new IOException("Exception at SimpleJsonExtractor"); + } + // Source is responsible to set SOURCE_FILEBASED_FILES_TO_PULL + this.fs = fsHelper.getFileSystem(); + InputStreamReader isr = new InputStreamReader(this.fs.open( + new Path(workUnitState.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL))), StandardCharsets.UTF_8); + + this.bufferedReader = + this.closer.register(new BufferedReader(isr)); + } + + @Override + public String getSchema() throws IOException { + // Source is responsible to set SOURCE_SCHEMA + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + IOUtils.copyBytes(fs.open( + new Path(workUnitState.getProp(ConfigurationKeys.SOURCE_SCHEMA))), outputStream, 4096, false); + String schema = new String(outputStream.toByteArray(), StandardCharsets.UTF_8); + workUnitState.setProp((ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY), schema); + return schema; + } + + @Override + public JsonObject readRecord(@Deprecated JsonObject reuse) throws DataRecordException, IOException { + String jsonString = this.bufferedReader.readLine(); + return GSON.fromJson(jsonString, JsonObject.class); + } + + @Override + public long getExpectedRecordCount() { + // We don't know how many records are in the file before actually reading them + return 0; + } + + @Override + public long getHighWatermark() { + // Watermark is not applicable for this type of extractor + return 0; + } + + @Override + public void close() throws IOException { + try { + this.closer.close(); + } catch (IOException ioe) { + LOGGER.error("Failed to close the input stream", ioe); + } + + try { + fs.close(); + } catch (IOException ioe) { + LOGGER.error("Failed to close the file object", ioe); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java index 9971d83..4cf6898 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java @@ -23,6 +23,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.source.workunit.Extract.TableType; @@ -52,8 +53,8 @@ public class JsonRecordAvroSchemaToAvroConverterTest { SourceState source = new SourceState(); this.state = new WorkUnitState( source.createWorkUnit(source.createExtract(TableType.SNAPSHOT_ONLY, "test_table", "test_namespace"))); - this.state.setProp(JsonRecordAvroSchemaToAvroConverter.AVRO_SCHEMA_KEY, avroSchemaString); - this.state.setProp(JsonRecordAvroSchemaToAvroConverter.IGNORE_FIELDS, "fieldToIgnore"); + this.state.setProp(ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY, avroSchemaString); + this.state.setProp(ConfigurationKeys.CONVERTER_IGNORE_FIELDS, "fieldToIgnore"); } @Test http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java index 8cceff2..95b3656 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java @@ -21,10 +21,13 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.source.DatePartitionedJsonFileSource; +import org.apache.gobblin.source.PartitionedFileSourceBase; import org.apache.gobblin.source.extractor.DataRecordException; import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.source.workunit.Extract; import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.testng.annotations.Test; import org.testng.collections.Lists; @@ -32,6 +35,7 @@ import org.testng.collections.Lists; import java.io.IOException; import java.util.List; + @Test public class FileBasedSourceTest { @Test @@ -57,6 +61,25 @@ public class FileBasedSourceTest { } } + @Test void numberOfWorkUnits() throws IOException { + SourceState sourceState = new SourceState(); + DatePartitionedJsonFileSource source = new DatePartitionedJsonFileSource(); + initState(sourceState); + List<WorkUnit> workUnits = source.getWorkunits(sourceState); + Assert.assertEquals(3, workUnits.size()); + } + + private void initState(State state) { + state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, + new Path(getClass().getResource("/source").toString()).toString()); + state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN, "yyyy-MM"); + state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE, "2017-11"); + state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, "snapshot_only"); + state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///"); + state.setProp(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR, "true"); + state.setProp(ConfigurationKeys.SCHEMA_FILENAME, "metadata.json"); + } + private static class DummyFileBasedSource extends FileBasedSource<String, String> { @Override public void initFileSystemHelper(State state) throws FileBasedHelperException { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2017-12/metadata.json ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/source/2017-12/metadata.json b/gobblin-core/src/test/resources/source/2017-12/metadata.json new file mode 100644 index 0000000..0003f63 --- /dev/null +++ b/gobblin-core/src/test/resources/source/2017-12/metadata.json @@ -0,0 +1 @@ +{"namespace":"example.avro", "type":"record", "name":"User", "fields":[{"name":"name", "type":"string"}, {"name":"favorite_number", "type":"int"}, {"name":"favorite_color", "type":"string"}]} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2017-12/simplejson.json ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/source/2017-12/simplejson.json b/gobblin-core/src/test/resources/source/2017-12/simplejson.json new file mode 100644 index 0000000..c325df0 --- /dev/null +++ b/gobblin-core/src/test/resources/source/2017-12/simplejson.json @@ -0,0 +1,3 @@ +{"name": "Alyssa", "favorite_number": 256, "favorite_color": "yellow"} +{"name": "Ben", "favorite_number": 7, "favorite_color": "red"} +{"name": "Charlie", "favorite_number": 68, "favorite_color": "blue"} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2018-01/metadata.json ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/source/2018-01/metadata.json b/gobblin-core/src/test/resources/source/2018-01/metadata.json new file mode 100644 index 0000000..0003f63 --- /dev/null +++ b/gobblin-core/src/test/resources/source/2018-01/metadata.json @@ -0,0 +1 @@ +{"namespace":"example.avro", "type":"record", "name":"User", "fields":[{"name":"name", "type":"string"}, {"name":"favorite_number", "type":"int"}, {"name":"favorite_color", "type":"string"}]} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2018-01/simplejson.json ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/source/2018-01/simplejson.json b/gobblin-core/src/test/resources/source/2018-01/simplejson.json new file mode 100644 index 0000000..c325df0 --- /dev/null +++ b/gobblin-core/src/test/resources/source/2018-01/simplejson.json @@ -0,0 +1,3 @@ +{"name": "Alyssa", "favorite_number": 256, "favorite_color": "yellow"} +{"name": "Ben", "favorite_number": 7, "favorite_color": "red"} +{"name": "Charlie", "favorite_number": 68, "favorite_color": "blue"} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2018-01/simplejson2.json ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/source/2018-01/simplejson2.json b/gobblin-core/src/test/resources/source/2018-01/simplejson2.json new file mode 100644 index 0000000..c325df0 --- /dev/null +++ b/gobblin-core/src/test/resources/source/2018-01/simplejson2.json @@ -0,0 +1,3 @@ +{"name": "Alyssa", "favorite_number": 256, "favorite_color": "yellow"} +{"name": "Ben", "favorite_number": 7, "favorite_color": "red"} +{"name": "Charlie", "favorite_number": 68, "favorite_color": "blue"}
