Repository: incubator-gobblin Updated Branches: refs/heads/master 461999eda -> 3323543b6
[GOBBLIN-459] Support encryption and decryption of strings + arrays of strings Closes #2330 from eogren/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3323543b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3323543b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3323543b Branch: refs/heads/master Commit: 3323543b6cfb5e5cfbfbea85783a5976f3a76267 Parents: 461999e Author: Eric Ogren <[email protected]> Authored: Sun Apr 8 23:04:42 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Sun Apr 8 23:04:42 2018 -0700 ---------------------------------------------------------------------- .../gobblin/recordaccess/RecordAccessor.java | 7 + .../recordaccess/AvroGenericRecordAccessor.java | 60 +++++++- .../AvroGenericRecordAccessorTest.java | 49 ++++++- .../RecordAccessorProviderFactoryTest.java | 16 +++ .../resources/converter/fieldPickInput.avsc | 3 +- .../converter/fieldPickInput_arrays.avro | Bin 0 -> 552 bytes .../AvroStringFieldDecryptorConverter.java | 34 +++++ .../StringFieldDecryptorConverter.java | 131 +++++++++++++++++ .../StringFieldEncryptorConverter.java | 45 ++++-- .../AvroStringFieldDecryptorConverterTest.java | 141 +++++++++++++++++++ .../AvroStringFieldEncryptorConverterTest.java | 62 +++++++- .../test/resources/fieldPickInput_arrays.avro | Bin 0 -> 552 bytes 12 files changed, 525 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java b/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java index 5791771..10f2fb3 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java @@ -16,6 +16,7 @@ */ package org.apache.gobblin.recordaccess; +import java.util.List; import java.util.Map; import org.apache.gobblin.annotation.Alpha; @@ -72,14 +73,19 @@ public interface RecordAccessor { * IncorrectTypeException if the underlying types do not match. Getters should not * try to do any type coercion -- for example, getAsInt for a value that is the string "1" * should throw a Sch. + * + * The get*Generic functions should return the following object types: + * String, Integer, Long, or a List of them. */ Map<String, String> getMultiAsString(String fieldName); Map<String, Integer> getMultiAsInt(String fieldName); Map<String, Long> getMultiAsLong(String fieldName); + Map<String, Object> getMultiGeneric(String fieldName); String getAsString(String fieldName); Integer getAsInt(String fieldName); Long getAsLong(String fieldName); + Object getGeneric(String fieldName); /* * Set new values for an object. Should throw a FieldDoesNotExistException runtime exception if fieldName @@ -88,6 +94,7 @@ public interface RecordAccessor { void set(String fieldName, String value); void set(String fieldName, Integer value); void set(String fieldName, Long value); + void setStringArray(String fieldName, List<String> value); void setToNull(String fieldName); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java b/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java index d51b5db..620933d 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java @@ -16,6 +16,7 @@ */ package org.apache.gobblin.recordaccess; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -23,6 +24,7 @@ import java.util.Map; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; @@ -48,7 +50,7 @@ public class AvroGenericRecordAccessor implements RecordAccessor { @Override public Map<String, String> getMultiAsString(String fieldName) { - Map<String, Object> vals = getMultiAsObject(fieldName); + Map<String, Object> vals = getMultiGeneric(fieldName); Map<String, String> ret = new HashMap<>(); for (Map.Entry<String, Object> entry : vals.entrySet()) { @@ -69,9 +71,12 @@ public class AvroGenericRecordAccessor implements RecordAccessor { return convertToString(fieldName, obj); } + private String convertToString(String fieldName, Object obj) { if (obj == null) { return null; + } else if (obj instanceof String) { + return (String)obj; } else if (obj instanceof Utf8) { return obj.toString(); } else { @@ -81,7 +86,7 @@ public class AvroGenericRecordAccessor implements RecordAccessor { @Override public Map<String, Integer> getMultiAsInt(String fieldName) { - Map<String, Object> vals = getMultiAsObject(fieldName); + Map<String, Object> vals = getMultiGeneric(fieldName); Map<String, Integer> ret = new HashMap<>(); for (Map.Entry<String, Object> entry : vals.entrySet()) { @@ -107,7 +112,7 @@ public class AvroGenericRecordAccessor implements RecordAccessor { @Override public Map<String, Long> getMultiAsLong(String fieldName) { - Map<String, Object> vals = getMultiAsObject(fieldName); + Map<String, Object> vals = getMultiGeneric(fieldName); Map<String, Long> ret = new HashMap<>(); for (Map.Entry<String, Object> entry : vals.entrySet()) { @@ -151,8 +156,36 @@ public class AvroGenericRecordAccessor implements RecordAccessor { return obj.isPresent() ? obj.get() : null; } - private Map<String, Object> getMultiAsObject(String fieldName) { - return AvroUtils.getMultiFieldValue(record, fieldName); + @Override + public Map<String, Object> getMultiGeneric(String fieldName) { + Map<String, Object> vals = AvroUtils.getMultiFieldValue(record, fieldName); + for (Map.Entry<String, Object> entry: vals.entrySet()) { + vals.put(entry.getKey(), convertAvroToJava(entry.getKey(), entry.getValue())); + } + + return vals; + } + + @Override + public Object getGeneric(String fieldName) { + Object val = getAsObject(fieldName); + return convertAvroToJava(fieldName, val); + } + + private Object convertAvroToJava(String fieldName, Object val) { + if (val == null || val instanceof String || val instanceof Long || val instanceof Integer) { + return val; + } + + if (val instanceof Utf8) { + return convertToString(fieldName, val); + } + + if (val instanceof GenericArray) { + return convertToList(fieldName, (GenericArray) val); + } + + throw new IllegalArgumentException("Don't know how to parse object of type " + val.getClass().getCanonicalName()); } @Override @@ -171,6 +204,13 @@ public class AvroGenericRecordAccessor implements RecordAccessor { } @Override + public void setStringArray(String fieldName, List<String> value) { + GenericData.Array<String> avroArray = new GenericData.Array<>( + Schema.createArray(Schema.create(Schema.Type.STRING)), value); + set(fieldName, avroArray); + } + + @Override public void setToNull(String fieldName) { set(fieldName, (Object) null); } @@ -225,4 +265,14 @@ public class AvroGenericRecordAccessor implements RecordAccessor { throw new FieldDoesNotExistException("Field not found setting name " + fieldName, e); } } + + @SuppressWarnings("unchecked") + private List convertToList(String fieldName, GenericArray arr) { + List ret = new ArrayList(); + for (int i = 0; i < arr.size(); i++) { + ret.add(convertAvroToJava(fieldName + "." + String.valueOf(i), arr.get(i))); + } + + return ret; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java index ed43aee..17e46b0 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java @@ -19,6 +19,7 @@ package org.apache.gobblin.recordaccess; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Map; import org.apache.avro.Schema; @@ -31,12 +32,15 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; +import org.apache.avro.util.Utf8; import org.testng.Assert; import org.testng.ITestResult; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableList; + public class AvroGenericRecordAccessorTest { private Schema recordSchema; @@ -112,6 +116,39 @@ public class AvroGenericRecordAccessorTest { Assert.assertEquals(accessor.getAsString("nestedRecords.1.fieldToEncrypt"), "val1"); } + @Test + public void testSetStringArray() throws IOException { + List<String> quotes = ImmutableList.of("abracadabra", "hocuspocus"); + accessor.setStringArray("favorite_quotes", quotes); + + Assert.assertEquals(accessor.getGeneric("favorite_quotes"), quotes); + } + + @Test + public void testGetStringArrayUtf8() throws IOException { + // Expectation: Even though we read an Avro object with UTF8 underneath, the accessor converts it into a + // Java String + List<String> expectedQuotes = ImmutableList.of("abc", "defg"); + + GenericData.Array<Utf8> strings = new GenericData.Array<Utf8>(2, Schema.createArray(Schema.create(Schema.Type.STRING))); + expectedQuotes.forEach(s -> strings.add(new Utf8(s))); + record.put("favorite_quotes", strings); + + Assert.assertEquals(accessor.getGeneric("favorite_quotes"), expectedQuotes); + } + + @Test + public void testGetMultiConvertsStrings() throws IOException { + updateRecordFromTestResource("converter/fieldPickInput", "converter/fieldPickInput_arrays.avro"); + Map<String, Object> ret = accessor.getMultiGeneric("favorite_quotes"); + Object val = ret.get("favorite_quotes"); + + Assert.assertTrue(val instanceof List); + List castedVal = (List)val; + Assert.assertEquals(2, castedVal.size()); + Assert.assertEquals("hello world", castedVal.get(0)); + Assert.assertEquals("foobar", castedVal.get(1)); + } @Test public void testSetValueFromArray() throws IOException { @@ -184,15 +221,23 @@ public class AvroGenericRecordAccessorTest { record.put("created", 0L); } - private void updateRecordFromTestResource(String resourceName) + private void updateRecordFromTestResource(String resourceName) throws IOException { + updateRecordFromTestResource(resourceName, null); + } + + private void updateRecordFromTestResource(String resourceName, String avroFileName) throws IOException { + if (avroFileName == null) { + avroFileName = resourceName + ".avro"; + } + recordSchema = new Schema.Parser().parse( getClass().getClassLoader().getResourceAsStream(resourceName + ".avsc") ); DatumReader<GenericRecord> reader = new GenericDatumReader<>(recordSchema); DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>( - new File(getClass().getClassLoader().getResource(resourceName + ".avro").getPath()), reader); + new File(getClass().getClassLoader().getResource(avroFileName).getPath()), reader); Assert.assertTrue(dataFileReader.hasNext()); record = dataFileReader.next(record); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java index a6a8ce0..3ed37f4 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.recordaccess; import java.io.IOException; +import java.util.List; import java.util.Map; import org.apache.avro.Schema; @@ -121,6 +122,21 @@ public class RecordAccessorProviderFactoryTest { } @Override + public Map<String, Object> getMultiGeneric(String fieldName) { + return null; + } + + @Override + public Object getGeneric(String fieldName) { + return null; + } + + @Override + public void setStringArray(String fieldName, List<String> value) { + + } + + @Override public Map<String, String> getMultiAsString(String fieldName) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc index 29b40a1..4da3fdf 100644 --- a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc +++ b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc @@ -7,6 +7,7 @@ {"name": "favorite_color", "type": ["string", "null"]}, {"name": "date_of_birth", "type": "long"}, {"name": "last_modified", "type": "long"}, - {"name": "created", "type": "long"} + {"name": "created", "type": "long"}, + {"name": "favorite_quotes", "type": [{ "type": "array", "items": "string"}, "null"], "default": null} ] } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro new file mode 100644 index 0000000..c10a607 Binary files /dev/null and b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro differ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java new file mode 100644 index 0000000..59408c3 --- /dev/null +++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +import org.apache.gobblin.recordaccess.AvroGenericRecordAccessor; +import org.apache.gobblin.recordaccess.RecordAccessor; + + +/** + * StringFieldDecryptor that works on Avro GenericRecords. + */ +public class AvroStringFieldDecryptorConverter extends StringFieldDecryptorConverter<Schema, GenericRecord> { + @Override + protected RecordAccessor getRecordAccessor(GenericRecord record) { + return new AvroGenericRecordAccessor(record); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java new file mode 100644 index 0000000..26f4bbd --- /dev/null +++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.gobblin.codec.StreamCodec; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.crypto.EncryptionConfigParser; +import org.apache.gobblin.crypto.EncryptionFactory; +import org.apache.gobblin.recordaccess.RecordAccessor; + +import com.google.common.base.Splitter; + + +/** + * Converter that can decrypt a string field in place. (Note: that means the incoming + * record will be mutated!). Assumes that the input field is of string + * type and that the decryption algorithm chosen will output a UTF-8 encoded byte array. + */ +public abstract class StringFieldDecryptorConverter<SCHEMA, DATA> extends Converter<SCHEMA, SCHEMA, DATA, DATA> { + public static final String FIELDS_TO_DECRYPT_CONFIG_NAME = "converter.fieldsToDecrypt"; + + private StreamCodec decryptor; + private List<String> fieldsToDecrypt; + + @Override + public Converter<SCHEMA, SCHEMA, DATA, DATA> init(WorkUnitState workUnit) { + super.init(workUnit); + Map<String, Object> config = EncryptionConfigParser + .getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_DECRYPT, getClass().getSimpleName(), workUnit); + decryptor = EncryptionFactory.buildStreamCryptoProvider(config); + + String fieldsToDecryptConfig = workUnit.getProp(FIELDS_TO_DECRYPT_CONFIG_NAME, null); + if (fieldsToDecryptConfig == null) { + throw new IllegalArgumentException("Must fill in the " + FIELDS_TO_DECRYPT_CONFIG_NAME + " config option!"); + } + + fieldsToDecrypt = Splitter.on(',').splitToList(fieldsToDecryptConfig); + + return this; + } + + @Override + public SCHEMA convertSchema(SCHEMA inputSchema, WorkUnitState workUnit) + throws SchemaConversionException { + return inputSchema; + } + + @Override + public Iterable<DATA> convertRecord(SCHEMA outputSchema, DATA inputRecord, WorkUnitState workUnit) + throws DataConversionException { + RecordAccessor accessor = getRecordAccessor(inputRecord); + + for (String field : fieldsToDecrypt) { + Map<String, Object> stringsToDecrypt = accessor.getMultiGeneric(field); + try { + for (Map.Entry<String, Object> entry : stringsToDecrypt.entrySet()) { + if (entry.getValue() instanceof String) { + String s = decryptString((String) entry.getValue()); + accessor.set(entry.getKey(), s); + } else if (entry.getValue() instanceof List) { + List<String> decryptedValues = new ArrayList<>(); + for (Object val : (List)entry.getValue()) { + if (!(val instanceof String)) { + throw new IllegalArgumentException("Expected List of Strings, but encountered a value of type " + + val.getClass().getCanonicalName()); + } + + decryptedValues.add(decryptString((String)val)); + } + + accessor.setStringArray(entry.getKey(), decryptedValues); + } else { + throw new IllegalArgumentException( + "Expected field to be of type String or List<String>, was " + entry.getValue().getClass() + .getCanonicalName()); + } + } + } catch (IOException | IllegalArgumentException | IllegalStateException e) { + throw new DataConversionException("Error while encrypting field " + field + ": " + e.getMessage(), e); + } + } + + return Collections.singleton(inputRecord); + } + + protected List<String> getFieldsToDecrypt() { + return fieldsToDecrypt; + } + + protected String decryptString(String val) + throws IOException { + byte[] encryptedBytes = val.getBytes(StandardCharsets.UTF_8); + + ByteArrayInputStream inStream = new ByteArrayInputStream(encryptedBytes); + + try (InputStream cipherStream = decryptor.decodeInputStream(inStream); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + IOUtils.copy(cipherStream, outputStream); + + byte[] decryptedBytes = outputStream.toByteArray(); + return new String(decryptedBytes, StandardCharsets.UTF_8); + } + } + + protected abstract RecordAccessor getRecordAccessor(DATA record); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java index f242ec5..e9b21ed 100644 --- a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java +++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -72,30 +73,48 @@ public abstract class StringFieldEncryptorConverter<SCHEMA, DATA> extends Conver RecordAccessor accessor = getRecordAccessor(inputRecord); for (String field : fieldsToEncrypt) { - Map<String, String> stringsToEncrypt = accessor.getMultiAsString(field); - - for (Map.Entry<String, String> entry : stringsToEncrypt.entrySet()) { - byte[] bytes = entry.getValue().getBytes(StandardCharsets.UTF_8); - - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + Map<String, Object> stringsToEncrypt = accessor.getMultiGeneric(field); + for (Map.Entry<String, Object> entry : stringsToEncrypt.entrySet()) { try { - OutputStream cipherStream = encryptor.encodeOutputStream(outputStream); - cipherStream.write(bytes); - cipherStream.flush(); - cipherStream.close(); + if (entry.getValue() instanceof String) { + accessor.set(entry.getKey(), encryptString((String) entry.getValue())); + } else if (entry.getValue() instanceof List) { + List<String> encryptedVals = new ArrayList<>(); + + for (Object val: (List)entry.getValue()) { + if (!(val instanceof String)) { + throw new IllegalArgumentException("Unexpected type " + val.getClass().getCanonicalName() + + " while encrypting field " + field); + } + + encryptedVals.add(encryptString((String)val)); + } + + accessor.setStringArray(entry.getKey(), encryptedVals); + } } catch (IOException | IllegalArgumentException | IllegalStateException e) { throw new DataConversionException("Error while encrypting field " + field + ": " + e.getMessage(), e); } - - byte[] cipherBytes = outputStream.toByteArray(); - accessor.set(entry.getKey(), new String(cipherBytes, StandardCharsets.UTF_8)); } } return Collections.singleton(inputRecord); } + private String encryptString(String val) + throws IOException { + byte[] bytes = val.getBytes(StandardCharsets.UTF_8); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + OutputStream cipherStream = encryptor.encodeOutputStream(outputStream); + cipherStream.write(bytes); + cipherStream.flush(); + cipherStream.close(); + return new String(outputStream.toByteArray(), StandardCharsets.UTF_8); + } + protected List<String> getFieldsToEncrypt() { return fieldsToEncrypt; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java new file mode 100644 index 0000000..36be27d --- /dev/null +++ b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.Maps; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.test.TestUtils; +import org.apache.gobblin.test.crypto.InsecureShiftCodec; + + +public class AvroStringFieldDecryptorConverterTest { + + @Test + public void testConversion() + throws DataConversionException, IOException, SchemaConversionException { + AvroStringFieldDecryptorConverter converter = new AvroStringFieldDecryptorConverter(); + WorkUnitState wuState = new WorkUnitState(); + + wuState.getJobState().setProp("converter.fieldsToDecrypt", "field1"); + wuState.getJobState().setProp("converter.decrypt.AvroStringFieldDecryptorConverter.algorithm", "insecure_shift"); + + converter.init(wuState); + GenericRecord inputRecord = TestUtils.generateRandomAvroRecord(); + + Schema inputSchema = inputRecord.getSchema(); + Schema outputSchema = converter.convertSchema(inputSchema, wuState); + + String fieldValue = (String) inputRecord.get("field1"); + + Iterable<GenericRecord> recordIt = converter.convertRecord(outputSchema, inputRecord, wuState); + GenericRecord decryptedRecord = recordIt.iterator().next(); + + Assert.assertEquals(outputSchema, inputSchema); + String decryptedValue = (String) decryptedRecord.get("field1"); + + InsecureShiftCodec codec = new InsecureShiftCodec(Maps.<String, Object>newHashMap()); + InputStream in = codec.decodeInputStream(new ByteArrayInputStream(fieldValue.getBytes(StandardCharsets.UTF_8))); + byte[] expectedDecryptedValue = new byte[in.available()]; + in.read(expectedDecryptedValue); + + Assert.assertEquals(new String(expectedDecryptedValue, StandardCharsets.UTF_8), decryptedValue); + } + + @Test + @SuppressWarnings("unchecked") + public void testArrayDecryption() + throws DataConversionException, IOException, SchemaConversionException { + AvroStringFieldDecryptorConverter converter = new AvroStringFieldDecryptorConverter(); + WorkUnitState wuState = new WorkUnitState(); + + wuState.getJobState().setProp("converter.fieldsToDecrypt", "array1"); + wuState.getJobState().setProp("converter.decrypt.AvroStringFieldDecryptorConverter.algorithm", "insecure_shift"); + + converter.init(wuState); + GenericRecord inputRecord = generateRecordWithArrays(); + + Schema inputSchema = inputRecord.getSchema(); + Schema outputSchema = converter.convertSchema(inputSchema, wuState); + + GenericData.Array<String> fieldValue = (GenericData.Array<String>) inputRecord.get("array1"); + + Iterable<GenericRecord> recordIt = converter.convertRecord(outputSchema, inputRecord, wuState); + GenericRecord decryptedRecord = recordIt.iterator().next(); + + Assert.assertEquals(outputSchema, inputSchema); + GenericData.Array<String> decryptedValue = (GenericData.Array<String>) decryptedRecord.get("array1"); + + for (int i = 0; i < decryptedValue.size(); i++) { + assertDecryptedValuesEqual(decryptedValue.get(i), fieldValue.get(i)); + } + } + + private void assertDecryptedValuesEqual(String decryptedValue, String originalValue) throws IOException { + InsecureShiftCodec codec = new InsecureShiftCodec(Maps.<String, Object>newHashMap()); + InputStream in = codec.decodeInputStream(new ByteArrayInputStream(originalValue.getBytes(StandardCharsets.UTF_8))); + byte[] expectedDecryptedValue = new byte[in.available()]; + in.read(expectedDecryptedValue); + + Assert.assertEquals(new String(expectedDecryptedValue, StandardCharsets.UTF_8), decryptedValue); + } + + private GenericRecord getRecordFromFile(String path) throws IOException { + DatumReader<GenericRecord> reader = new GenericDatumReader<>(); + DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(new File(path), reader); + while (dataFileReader.hasNext()) { + return dataFileReader.next(); + } + + return null; + } + + private GenericRecord generateRecordWithArrays() { + ArrayList<Schema.Field> fields = new ArrayList<Schema.Field>(); + String fieldName = "array1"; + Schema fieldSchema = Schema.createArray(Schema.create(Schema.Type.STRING)); + String docString = "doc"; + fields.add(new Schema.Field(fieldName, fieldSchema, docString, null)); + Schema schema = Schema.createRecord("name", docString, "test", false); + schema.setFields(fields); + + GenericData.Record record = new GenericData.Record(schema); + GenericData.Array<String> arr = new GenericData.Array<>(2, fieldSchema); + arr.add("foobar"); + arr.add("foobaz"); + + record.put("array1", arr); + + return record; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java index e16401d..5078ae8 100644 --- a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java +++ b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.converter; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -26,9 +27,14 @@ import java.util.List; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; import org.testng.Assert; import org.testng.annotations.Test; @@ -37,6 +43,7 @@ import com.google.common.collect.Maps; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.test.TestUtils; import org.apache.gobblin.test.crypto.InsecureShiftCodec; +import org.testng.collections.Lists; public class AvroStringFieldEncryptorConverterTest { @@ -83,10 +90,61 @@ public class AvroStringFieldEncryptorConverterTest { Assert.assertEquals(decryptedValues, origValues); } - private GenericRecord getRecordFromFile(String path) throws IOException { + @Test + @SuppressWarnings("unchecked") + public void testEncryptionOfArray() + throws SchemaConversionException, DataConversionException, IOException { + AvroStringFieldEncryptorConverter converter = new AvroStringFieldEncryptorConverter(); + WorkUnitState wuState = new WorkUnitState(); + + wuState.getJobState().setProp("converter.fieldsToEncrypt", "favorite_quotes"); + wuState.getJobState().setProp("converter.encrypt.algorithm", "insecure_shift"); + + converter.init(wuState); + GenericRecord inputRecord = + getRecordFromFile(getClass().getClassLoader().getResource("fieldPickInput_arrays.avro").getPath()); + GenericArray origValues = (GenericArray) inputRecord.get("favorite_quotes"); + for (int i = 0; i < origValues.size(); i++) { + origValues.set(i, origValues.get(i).toString()); + } + + Schema inputSchema = inputRecord.getSchema(); + Schema outputSchema = converter.convertSchema(inputSchema, wuState); + + Iterable<GenericRecord> recordIt = converter.convertRecord(outputSchema, inputRecord, wuState); + GenericRecord encryptedRecord = recordIt.iterator().next(); + + Assert.assertEquals(outputSchema, inputSchema); + + GenericArray<String> encryptedVals = (GenericArray<String>) encryptedRecord.get("favorite_quotes"); + List<String> decryptedValues = Lists.newArrayList(); + for (String encryptedValue: encryptedVals) { + InsecureShiftCodec codec = new InsecureShiftCodec(Maps.<String, Object>newHashMap()); + InputStream in = + codec.decodeInputStream(new ByteArrayInputStream(encryptedValue.getBytes(StandardCharsets.UTF_8))); + byte[] decryptedValue = new byte[in.available()]; + in.read(decryptedValue); + decryptedValues.add(new String(decryptedValue, StandardCharsets.UTF_8)); + } + + Assert.assertEquals(decryptedValues, origValues); + } + + private GenericArray<String> buildTestArray() { + Schema s = Schema.createArray(Schema.create(Schema.Type.STRING)); + GenericArray<String> arr = new GenericData.Array<>(3, s); + arr.add("one"); + arr.add("two"); + arr.add("three"); + + return arr; + } + + private GenericRecord getRecordFromFile(String path) + throws IOException { DatumReader<GenericRecord> reader = new GenericDatumReader<>(); DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(new File(path), reader); - while (dataFileReader.hasNext()) { + if (dataFileReader.hasNext()) { return dataFileReader.next(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro new file mode 100644 index 0000000..c10a607 Binary files /dev/null and b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro differ
