Repository: incubator-gobblin Updated Branches: refs/heads/master 0795fa7a0 -> b7f123f77
[GOBBLIN-427] Add a decryption converter Closes #2304 from xzhang27/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b7f123f7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b7f123f7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b7f123f7 Branch: refs/heads/master Commit: b7f123f77a58c690a9acf89f6d3168aeda259a17 Parents: 0795fa7 Author: Xiang <[email protected]> Authored: Mon Mar 19 11:39:46 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Mar 19 11:39:55 2018 -0700 ---------------------------------------------------------------------- .../gobblin/crypto/EncryptionConfigParser.java | 4 +- .../crypto/EncryptionConfigParserTest.java | 4 +- .../copy/converter/DecryptConverter.java | 2 +- ...alizedRecordToSerializedRecordConverter.java | 44 +++++++++ ...ordToEncryptedSerializedRecordConverter.java | 2 +- .../StringFieldEncryptorConverter.java | 2 +- ...edRecordToSerializedRecordConverterTest.java | 95 ++++++++++++++++++++ ...edRecordToSerializedRecordConverterBase.java | 79 ++++++++++++++++ ...dRecordWithMetadataToRecordWithMetadata.java | 81 +++++++++++++++++ ...ordWithMetadataToRecordWithMetadataTest.java | 75 ++++++++++++++++ 10 files changed, 382 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java b/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java index 324365a..900b616 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java @@ -52,6 +52,7 @@ public class EncryptionConfigParser { */ static final String WRITER_ENCRYPT_PREFIX = ConfigurationKeys.WRITER_PREFIX + ".encrypt"; static final String CONVERTER_ENCRYPT_PREFIX = "converter.encrypt"; + static final String CONVERTER_DECRYPT_PREFIX = "converter.decrypt"; public static final String ENCRYPTION_ALGORITHM_KEY = "algorithm"; public static final String ENCRYPTION_KEYSTORE_PATH_KEY = "keystore_path"; @@ -72,7 +73,8 @@ public class EncryptionConfigParser { * enum maps entity type to a configuration prefix. */ public enum EntityType { - CONVERTER(CONVERTER_ENCRYPT_PREFIX), + CONVERTER_ENCRYPT(CONVERTER_ENCRYPT_PREFIX), + CONVERTER_DECRYPT(CONVERTER_DECRYPT_PREFIX), WRITER(WRITER_ENCRYPT_PREFIX); private final String configPrefix; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java b/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java index 68fbf37..54b52d4 100644 --- a/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java +++ b/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java @@ -97,7 +97,7 @@ public class EncryptionConfigParserTest { "keyname"); wuState.setProp(EncryptionConfigParser.CONVERTER_ENCRYPT_PREFIX + "abc.def", "foobar"); - Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, wuState); + Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, wuState); Assert.assertNotNull(parsedProperties, "Expected parser to only return one record"); Assert.assertEquals(parsedProperties.size(), 4, "Did not expect abc.def to be picked up in config"); @@ -125,7 +125,7 @@ public class EncryptionConfigParserTest { "keyname"); wuState.setProp(EncryptionConfigParser.CONVERTER_ENCRYPT_PREFIX + "abc.def", "foobar"); - Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, entityName, wuState); + Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, entityName, wuState); Assert.assertNotNull(parsedProperties, "Expected parser to only return one record"); Assert.assertEquals(parsedProperties.size(), 4, "Did not expect abc.def to be picked up in config"); Assert.assertEquals(EncryptionConfigParser.getEncryptionType(parsedProperties), "aes_rotating"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java index d074046..d9c6353 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java @@ -59,7 +59,7 @@ public class DecryptConverter extends DistcpConverter { @Override public Converter<String, String, FileAwareInputStream, FileAwareInputStream> init(WorkUnitState workUnit) { Map<String, Object> config = - EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, workUnit); + EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, workUnit); if (config == null) { // Backwards compatibility check: if no config was passed in via the standard config, revert back to GPG http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java new file mode 100644 index 0000000..2088eee --- /dev/null +++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.converter; + +import java.util.Map; +import org.apache.gobblin.codec.StreamCodec; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.crypto.EncryptionConfigParser; +import org.apache.gobblin.crypto.EncryptionFactory; + + +/** + * Specific implementation of {@link EncryptedSerializedRecordToSerializedRecordConverterBase} that uses Gobblin's + * {@link EncryptionFactory} to build the proper decryption codec based on config. + */ +public class EncryptedSerializedRecordToSerializedRecordConverter extends EncryptedSerializedRecordToSerializedRecordConverterBase { + @Override + protected StreamCodec buildDecryptor(WorkUnitState config) { + Map<String, Object> decryptionConfig = + EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_DECRYPT, + getClass().getSimpleName(), config); + if (decryptionConfig == null) { + throw new IllegalStateException("No decryption config specified in job - can't decrypt!"); + } + + return EncryptionFactory.buildStreamCryptoProvider(decryptionConfig); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java index 2c830a9..be550c7 100644 --- a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java +++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java @@ -32,7 +32,7 @@ public class SerializedRecordToEncryptedSerializedRecordConverter extends Serial @Override protected StreamCodec buildEncryptor(WorkUnitState config) { Map<String, Object> encryptionConfig = - EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, getClass().getSimpleName(), config); + EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, getClass().getSimpleName(), config); if (encryptionConfig == null) { throw new IllegalStateException("No encryption config specified in job - can't encrypt!"); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java index d6e8de1..f242ec5 100644 --- a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java +++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java @@ -47,7 +47,7 @@ public abstract class StringFieldEncryptorConverter<SCHEMA, DATA> extends Conver public Converter<SCHEMA, SCHEMA, DATA, DATA> init(WorkUnitState workUnit) { super.init(workUnit); Map<String, Object> config = EncryptionConfigParser - .getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, getClass().getSimpleName(), workUnit); + .getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, getClass().getSimpleName(), workUnit); encryptor = EncryptionFactory.buildStreamCryptoProvider(config); String fieldsToEncryptConfig = workUnit.getProp(FIELDS_TO_ENCRYPT_CONFIG_NAME, null); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java new file mode 100644 index 0000000..efaeb16 --- /dev/null +++ b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.converter; + +import java.util.Iterator; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.crypto.EncryptionConfigParser; +import org.apache.gobblin.metadata.types.Metadata; +import org.apache.gobblin.test.crypto.InsecureShiftCodec; +import org.apache.gobblin.type.RecordWithMetadata; +import org.testng.Assert; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + + +public class EncryptedSerializedRecordToSerializedRecordConverterTest { + + private WorkUnitState workUnitState; + private EncryptedSerializedRecordToSerializedRecordConverter converter; + private RecordWithMetadata<byte[]> sampleRecord; + private byte[] shiftedValue; + private String insecureShiftTag; + + private final String DECRYPT_PREFIX = "converter.decrypt."; + + @BeforeTest + public void setUp() { + workUnitState = new WorkUnitState(); + converter = new EncryptedSerializedRecordToSerializedRecordConverter(); + sampleRecord = new RecordWithMetadata<>(new byte[]{'b', 'c', 'd', 'e'}, new Metadata()); + shiftedValue = new byte[]{'a', 'b', 'c', 'd'}; + insecureShiftTag = InsecureShiftCodec.TAG; + } + + @Test(expectedExceptions = IllegalStateException.class) + public void throwsIfMisconfigured() + throws DataConversionException { + converter.init(workUnitState); + converter.convertRecord("", sampleRecord, workUnitState); + } + + @Test + public void worksWithFork() + throws DataConversionException { + workUnitState.setProp(ConfigurationKeys.FORK_BRANCH_ID_KEY, 2); + workUnitState.getJobState() + .setProp(DECRYPT_PREFIX + EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY + ".2", + "insecure_shift"); + + converter.init(workUnitState); + Iterable<RecordWithMetadata<byte[]>> records = converter.convertRecord("", sampleRecord, workUnitState); + Iterator<RecordWithMetadata<byte[]>> recordIterator = records.iterator(); + Assert.assertTrue(recordIterator.hasNext()); + + RecordWithMetadata<byte[]> record = recordIterator.next(); + + Assert.assertFalse(recordIterator.hasNext()); + Assert.assertEquals(record.getMetadata().getGlobalMetadata().getTransferEncoding().get(0), insecureShiftTag); + Assert.assertEquals(record.getRecord(), shiftedValue); + } + + @Test + public void worksNoFork() + throws DataConversionException { + workUnitState.getJobState() + .setProp(DECRYPT_PREFIX + EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, + "insecure_shift"); + converter.init(workUnitState); + Iterable<RecordWithMetadata<byte[]>> records = converter.convertRecord("", sampleRecord, workUnitState); + Iterator<RecordWithMetadata<byte[]>> recordIterator = records.iterator(); + Assert.assertTrue(recordIterator.hasNext()); + + RecordWithMetadata<byte[]> record = recordIterator.next(); + + Assert.assertFalse(recordIterator.hasNext()); + Assert.assertEquals(record.getMetadata().getGlobalMetadata().getTransferEncoding().get(0), insecureShiftTag); + Assert.assertEquals(record.getRecord(), shiftedValue); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java new file mode 100644 index 0000000..7b6baca --- /dev/null +++ b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.converter; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Collections; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.IOUtils; +import org.apache.gobblin.codec.StreamCodec; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.type.RecordWithMetadata; + + +/** + * A converter that converts a encrypted {@link org.apache.gobblin.type.SerializedRecordWithMetadata} to + * a {@link org.apache.gobblin.type.SerializedRecordWithMetadata}. The decryption algorithm used will be + * appended to the Transfer-Encoding of the new record. + */ +@Slf4j +public abstract class EncryptedSerializedRecordToSerializedRecordConverterBase extends Converter<String, String, RecordWithMetadata<byte[]>, RecordWithMetadata<byte[]>> { + private StreamCodec decryptor; + + @Override + public Converter<String, String, RecordWithMetadata<byte[]>, RecordWithMetadata<byte[]>> init( + WorkUnitState workUnit) { + super.init(workUnit); + decryptor = buildDecryptor(workUnit); + return this; + } + + /** + * Build the StreamCodec that will be used to decrypt each byte record. Must be provided by concrete + * implementations of this class. + */ + protected abstract StreamCodec buildDecryptor(WorkUnitState config); + + @Override + public String convertSchema(String inputSchema, WorkUnitState workUnit) + throws SchemaConversionException { + return ""; + } + + @Override + public Iterable<RecordWithMetadata<byte[]>> convertRecord(String outputSchema, RecordWithMetadata<byte[]> inputRecord, + WorkUnitState workUnit) + throws DataConversionException { + try { + ByteArrayInputStream inputStream = new ByteArrayInputStream(inputRecord.getRecord()); + byte[] decryptedBytes; + try (InputStream decryptedInputStream = decryptor.decodeInputStream(inputStream)) { + decryptedBytes = IOUtils.toByteArray(decryptedInputStream); + } + inputRecord.getMetadata().getGlobalMetadata().addTransferEncoding(decryptor.getTag()); + + RecordWithMetadata<byte[]> serializedRecord = + new RecordWithMetadata<byte[]>(decryptedBytes, inputRecord.getMetadata()); + return Collections.singleton(serializedRecord); + } catch (Exception e) { + throw new DataConversionException(e); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java new file mode 100644 index 0000000..c686091 --- /dev/null +++ b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.converter; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metadata.types.Metadata; +import org.apache.gobblin.type.RecordWithMetadata; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * A converter that takes a {@link RecordWithMetadata} and deserializes it by trying to parse it into a + * json format. It looks up two fields: "rMd" for record metadata and "r" for record details represented + * as a string. + */ +public class EnvelopedRecordWithMetadataToRecordWithMetadata extends Converter<String, Object, RecordWithMetadata<byte[]>, RecordWithMetadata<?>> { + + private static final String RECORD_KEY = "r"; + private static final String METADATA_KEY = "rMd"; + private static final String METADATA_RECORD_KEY = "recordMetadata"; + + private static final ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static final JsonFactory jsonFactory = new JsonFactory(); + + @Override + public String convertSchema(String inputSchema, WorkUnitState workUnit) + throws SchemaConversionException { + return ""; + } + + @Override + public Iterable<RecordWithMetadata<?>> convertRecord(Object outputSchema, RecordWithMetadata<byte[]> inputRecord, + WorkUnitState workUnit) + throws DataConversionException { + + try { + try (JsonParser parser = jsonFactory.createJsonParser(inputRecord.getRecord())) { + parser.setCodec(objectMapper); + JsonNode jsonNode = parser.readValueAsTree(); + + // extracts required record + if (!jsonNode.has(RECORD_KEY)) { + throw new DataConversionException("Input data does not have record."); + } + String record = jsonNode.get(RECORD_KEY).getTextValue(); + + // Extract metadata field + Metadata md = new Metadata(); + if (jsonNode.has(METADATA_KEY) && jsonNode.get(METADATA_KEY).has(METADATA_RECORD_KEY)) { + md.getRecordMetadata().putAll(objectMapper.readValue(jsonNode.get(METADATA_KEY).get(METADATA_RECORD_KEY), Map.class)); + } + + return Collections.singleton(new RecordWithMetadata<>(record, md)); + } + } catch (IOException e) { + throw new DataConversionException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java b/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java new file mode 100644 index 0000000..8d247a7 --- /dev/null +++ b/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.converter; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import org.apache.gobblin.metadata.types.Metadata; +import org.apache.gobblin.type.RecordWithMetadata; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test +public class EnvelopedRecordWithMetadataToRecordWithMetadataTest { + + @Test + public void testSuccessWithRecord() throws DataConversionException, IOException { + ObjectMapper objectMapper = new ObjectMapper(); + String innerRecord = "abracadabra"; + + // Build the input record + HashMap<String, Object> map = new HashMap<>(); + map.put("r", innerRecord); + Metadata md = new Metadata(); + md.getRecordMetadata().put("test1", "test2"); + map.put("rMd", md); + JsonNode jsonNode = objectMapper.valueToTree(map); + RecordWithMetadata<byte[]> inputRecord = new RecordWithMetadata<>(jsonNode.toString().getBytes(), null); + + EnvelopedRecordWithMetadataToRecordWithMetadata converter = new EnvelopedRecordWithMetadataToRecordWithMetadata(); + Iterator<RecordWithMetadata<?>> iterator = + converter.convertRecord(null, inputRecord, null).iterator(); + + Assert.assertTrue(iterator.hasNext()); + + RecordWithMetadata<?> outputRecord = iterator.next(); + + Assert.assertEquals(outputRecord.getRecord(), innerRecord); + Assert.assertEquals(outputRecord.getMetadata().getRecordMetadata().get("test1"), "test2"); + } + + @Test(expectedExceptions = DataConversionException.class, expectedExceptionsMessageRegExp = "Input data does not have record.") + public void testFailureWithoutRecord() throws DataConversionException, IOException { + ObjectMapper objectMapper = new ObjectMapper(); + + // Build the input record without data + HashMap<String, Object> map = new HashMap<>(); + Metadata md = new Metadata(); + md.getRecordMetadata().put("test1", "test2"); + map.put("rMd", md); + JsonNode jsonNode = objectMapper.valueToTree(map); + RecordWithMetadata<byte[]> inputRecord = new RecordWithMetadata<>(jsonNode.toString().getBytes(), null); + + EnvelopedRecordWithMetadataToRecordWithMetadata converter = new EnvelopedRecordWithMetadataToRecordWithMetadata(); + converter.convertRecord(null, inputRecord, null); + } + +}
