Repository: phoenix Updated Branches: refs/heads/master 8ce3b580f -> 578979a14
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java new file mode 100644 index 0000000..3455616 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PIntegerArray; +import org.apache.phoenix.schema.types.PUnsignedInt; +import org.apache.phoenix.util.ColumnInfo; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import static org.junit.Assert.assertEquals; + +public class FormatToKeyValueMapperTest { + + @Test + public void testBuildColumnInfoList() { + List<ColumnInfo> columnInfoList = ImmutableList.of( + new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()), + new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()), + new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType())); + + Configuration conf = new Configuration(); + FormatToKeyValueMapper.configureColumnInfoList(conf, columnInfoList); + List<ColumnInfo> fromConfig = FormatToKeyValueMapper.buildColumnInfoList(conf); + + assertEquals(columnInfoList, fromConfig); + } + + @Test + public void testBuildColumnInfoList_ContainingNulls() { + // A null value in the column info list means "skip that column in the input" + List<ColumnInfo> columnInfoListWithNull = Lists.newArrayList( + new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()), + null, + new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()), + new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType())); + + Configuration conf = new Configuration(); + FormatToKeyValueMapper.configureColumnInfoList(conf, columnInfoListWithNull); + List<ColumnInfo> fromConfig = FormatToKeyValueMapper.buildColumnInfoList(conf); + + assertEquals(columnInfoListWithNull, fromConfig); + } + + @Test + public void testLoadPreUpdateProcessor() { + Configuration conf = new Configuration(); + conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class, + ImportPreUpsertKeyValueProcessor.class); + + ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); + assertEquals(MockUpsertProcessor.class, processor.getClass()); + } + + @Test + public void testLoadPreUpdateProcessor_NotConfigured() { + + Configuration conf = new Configuration(); + ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); + + assertEquals(FormatToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class, + processor.getClass()); + } + + @Test(expected=IllegalStateException.class) + public void testLoadPreUpdateProcessor_ClassNotFound() { + Configuration conf = new Configuration(); + conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass"); + + PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); + } + + static class MockUpsertProcessor implements ImportPreUpsertKeyValueProcessor { + @Override + public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) { + throw new UnsupportedOperationException("Not yet implemented"); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java new file mode 100644 index 0000000..b614312 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Arrays; +import java.util.List; + +import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.schema.types.PArrayDataType; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PIntegerArray; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionlessQueryTest { + + protected Connection conn; + protected List<ColumnInfo> columnInfoList; + protected PreparedStatement preparedStatement; + protected UpsertExecutor.UpsertListener<R> upsertListener; + + protected abstract UpsertExecutor<R, F> getUpsertExecutor(); + protected abstract R createRecord(Object... columnValues) throws IOException; + + @Before + public void setUp() throws SQLException { + columnInfoList = ImmutableList.of( + new ColumnInfo("ID", Types.BIGINT), + new ColumnInfo("NAME", Types.VARCHAR), + new ColumnInfo("AGE", Types.INTEGER), + new ColumnInfo("VALUES", PIntegerArray.INSTANCE.getSqlType())); + + preparedStatement = mock(PreparedStatement.class); + upsertListener = mock(UpsertExecutor.UpsertListener.class); + conn = DriverManager.getConnection(getUrl()); + } + + @After + public void tearDown() throws SQLException { + conn.close(); + } + + @Test + public void testExecute() throws Exception { + getUpsertExecutor().execute(createRecord(123L, "NameValue", 42, Arrays.asList(1, 2, 3))); + + verify(upsertListener).upsertDone(1L); + verifyNoMoreInteractions(upsertListener); + + verify(preparedStatement).setObject(1, Long.valueOf(123L)); + verify(preparedStatement).setObject(2, "NameValue"); + verify(preparedStatement).setObject(3, Integer.valueOf(42)); + verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); + verify(preparedStatement).execute(); + verifyNoMoreInteractions(preparedStatement); + } + + @Test + public void testExecute_TooFewFields() throws Exception { + R recordWithTooFewFields = createRecord(123L, "NameValue"); + getUpsertExecutor().execute(recordWithTooFewFields); + + verify(upsertListener).errorOnRecord(eq(recordWithTooFewFields), any(Throwable.class)); + verifyNoMoreInteractions(upsertListener); + } + + @Test + public void testExecute_TooManyFields() throws Exception { + R recordWithTooManyFields = createRecord(123L, "NameValue", 42, Arrays.asList(1, 2, 3), "Garbage"); + getUpsertExecutor().execute(recordWithTooManyFields); + + verify(upsertListener).upsertDone(1L); + verifyNoMoreInteractions(upsertListener); + + verify(preparedStatement).setObject(1, Long.valueOf(123L)); + verify(preparedStatement).setObject(2, "NameValue"); + verify(preparedStatement).setObject(3, Integer.valueOf(42)); + verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); + verify(preparedStatement).execute(); + verifyNoMoreInteractions(preparedStatement); + } + + @Test + public void testExecute_NullField() throws Exception { + getUpsertExecutor().execute(createRecord(123L, "NameValue", null, Arrays.asList(1, 2, 3))); + + verify(upsertListener).upsertDone(1L); + verifyNoMoreInteractions(upsertListener); + + verify(preparedStatement).setObject(1, Long.valueOf(123L)); + verify(preparedStatement).setObject(2, "NameValue"); + verify(preparedStatement).setNull(3, columnInfoList.get(2).getSqlType()); + verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); + verify(preparedStatement).execute(); + verifyNoMoreInteractions(preparedStatement); + } + + @Test + public void testExecute_InvalidType() throws Exception { + R recordWithInvalidType = createRecord(123L, "NameValue", "ThisIsNotANumber", Arrays.asList(1, 2, 3)); + getUpsertExecutor().execute(recordWithInvalidType); + + verify(upsertListener).errorOnRecord(eq(recordWithInvalidType), any(Throwable.class)); + verifyNoMoreInteractions(upsertListener); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java index 6efe246..7a09bee 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java @@ -17,130 +17,50 @@ */ package org.apache.phoenix.util.csv; -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVRecord; -import org.apache.phoenix.query.BaseConnectionlessQueryTest; -import org.apache.phoenix.schema.types.PInteger; -import org.apache.phoenix.schema.types.PIntegerArray; -import org.apache.phoenix.schema.types.PArrayDataType; -import org.apache.phoenix.util.ColumnInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; import java.sql.SQLException; -import java.sql.Types; import java.util.List; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; - -public class CsvUpsertExecutorTest extends BaseConnectionlessQueryTest { - - private Connection conn; - private List<ColumnInfo> columnInfoList; - private PreparedStatement preparedStatement; - private CsvUpsertExecutor.UpsertListener upsertListener; - - private CsvUpsertExecutor upsertExecutor; - - @Before - public void setUp() throws SQLException { - columnInfoList = ImmutableList.of( - new ColumnInfo("ID", Types.BIGINT), - new ColumnInfo("NAME", Types.VARCHAR), - new ColumnInfo("AGE", Types.INTEGER), - new ColumnInfo("VALUES", PIntegerArray.INSTANCE.getSqlType())); - - preparedStatement = mock(PreparedStatement.class); - upsertListener = mock(CsvUpsertExecutor.UpsertListener.class); - conn = DriverManager.getConnection(getUrl()); - upsertExecutor = new CsvUpsertExecutor(conn, columnInfoList, preparedStatement, upsertListener, ":"); - } - - @After - public void tearDown() throws SQLException { - conn.close(); - } - - @Test - public void testExecute() throws Exception { - upsertExecutor.execute(createCsvRecord("123,NameValue,42,1:2:3")); - - verify(upsertListener).upsertDone(1L); - verifyNoMoreInteractions(upsertListener); - - verify(preparedStatement).setObject(1, Long.valueOf(123L)); - verify(preparedStatement).setObject(2, "NameValue"); - verify(preparedStatement).setObject(3, Integer.valueOf(42)); - verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); - verify(preparedStatement).execute(); - verifyNoMoreInteractions(preparedStatement); - } - - @Test - public void testExecute_TooFewFields() throws Exception { - CSVRecord csvRecordWithTooFewFields = createCsvRecord("123,NameValue"); - upsertExecutor.execute(csvRecordWithTooFewFields); - - verify(upsertListener).errorOnRecord(eq(csvRecordWithTooFewFields), any(Throwable.class)); - verifyNoMoreInteractions(upsertListener); - } - - @Test - public void testExecute_TooManyFields() throws Exception { - CSVRecord csvRecordWithTooManyFields = createCsvRecord("123,NameValue,42,1:2:3,Garbage"); - upsertExecutor.execute(csvRecordWithTooManyFields); - - verify(upsertListener).upsertDone(1L); - verifyNoMoreInteractions(upsertListener); - - verify(preparedStatement).setObject(1, Long.valueOf(123L)); - verify(preparedStatement).setObject(2, "NameValue"); - verify(preparedStatement).setObject(3, Integer.valueOf(42)); - verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); - verify(preparedStatement).execute(); - verifyNoMoreInteractions(preparedStatement); - } +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.phoenix.util.AbstractUpsertExecutorTest; +import org.apache.phoenix.util.UpsertExecutor; +import org.junit.Before; - @Test - public void testExecute_NullField() throws Exception { - upsertExecutor.execute(createCsvRecord("123,NameValue,,1:2:3")); +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; - verify(upsertListener).upsertDone(1L); - verifyNoMoreInteractions(upsertListener); +public class CsvUpsertExecutorTest extends AbstractUpsertExecutorTest<CSVRecord, String> { - verify(preparedStatement).setObject(1, Long.valueOf(123L)); - verify(preparedStatement).setObject(2, "NameValue"); - verify(preparedStatement).setNull(3, columnInfoList.get(2).getSqlType()); - verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); - verify(preparedStatement).execute(); - verifyNoMoreInteractions(preparedStatement); - } + private static final String ARRAY_SEP = ":"; - @Test - public void testExecute_InvalidType() throws Exception { - CSVRecord csvRecordWithInvalidType = createCsvRecord("123,NameValue,ThisIsNotANumber,1:2:3"); - upsertExecutor.execute(csvRecordWithInvalidType); + private UpsertExecutor<CSVRecord, String> upsertExecutor; - verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class)); - verifyNoMoreInteractions(upsertListener); + @Override + public UpsertExecutor<CSVRecord, String> getUpsertExecutor() { + return upsertExecutor; } - private CSVRecord createCsvRecord(String...columnValues) throws IOException { + @Override + public CSVRecord createRecord(Object... columnValues) throws IOException { + for (int i = 0; i < columnValues.length; i++) { + if (columnValues[i] == null) { + // Joiner.join throws on nulls, replace with empty string. + columnValues[i] = ""; + } + if (columnValues[i] instanceof List) { + columnValues[i] = Joiner.on(ARRAY_SEP).join((List<?>) columnValues[i]); + } + } String inputRecord = Joiner.on(',').join(columnValues); return Iterables.getFirst(CSVParser.parse(inputRecord, CSVFormat.DEFAULT), null); } + + @Before + public void setUp() throws SQLException { + super.setUp(); + upsertExecutor = new CsvUpsertExecutor(conn, columnInfoList, preparedStatement, + upsertListener, ARRAY_SEP); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java new file mode 100644 index 0000000..c042dd4 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util.json; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.phoenix.util.AbstractUpsertExecutorTest; +import org.apache.phoenix.util.UpsertExecutor; +import org.junit.Before; + +public class JsonUpsertExecutorTest extends AbstractUpsertExecutorTest<Map<?, ?>, Object> { + + private UpsertExecutor<Map<?, ?>, Object> upsertExecutor; + + @Override + protected UpsertExecutor<Map<?, ?>, Object> getUpsertExecutor() { + return upsertExecutor; + } + + @Override + protected Map<?, ?> createRecord(Object... columnValues) throws IOException { + Map ret = new HashMap(columnValues.length); + int min = Math.min(columnInfoList.size(), columnValues.length); + for (int i = 0; i < min; i++) { + ret.put(columnInfoList.get(i).getColumnName().replace("\"", "").toLowerCase(), columnValues[i]); + } + return ret; + } + + @Before + public void setUp() throws SQLException { + super.setUp(); + upsertExecutor = new JsonUpsertExecutor(conn, columnInfoList, preparedStatement, upsertListener); + } +}