Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 bdfba4a55 -> 6acaaf6c0
Revert "PHOENIX-1973 Improve CsvBulkLoadTool performance by moving keyvalue construction from map phase to reduce phase(Sergey Soldatov)" This reverts commit 0b80eef660a57d4ed045ce6e070e7133a3610690. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6acaaf6c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6acaaf6c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6acaaf6c Branch: refs/heads/4.x-HBase-0.98 Commit: 6acaaf6c0f196622e65b5632eadd58a6af46e6de Parents: bdfba4a Author: James Taylor <[email protected]> Authored: Sat Feb 27 10:07:43 2016 -0800 Committer: James Taylor <[email protected]> Committed: Sat Feb 27 10:07:43 2016 -0800 ---------------------------------------------------------------------- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 6 +- .../mapreduce/FormatToKeyValueMapper.java | 164 +++---------------- .../mapreduce/FormatToKeyValueReducer.java | 127 ++------------ .../bulkload/TargetTableRefFunctions.java | 22 +-- 4 files changed, 38 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/6acaaf6c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index ab2848f..39ee4b1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -41,7 +41,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -269,7 +268,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { job.setInputFormatClass(TextInputFormat.class); job.setMapOutputKeyClass(TableRowkeyPair.class); - job.setMapOutputValueClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); job.setOutputKeyClass(TableRowkeyPair.class); job.setOutputValueClass(KeyValue.class); job.setReducerClass(FormatToKeyValueReducer.class); @@ -277,10 +276,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded); final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded); - final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAN_NAMES_TO_JSON.apply(tablesToBeLoaded); - job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson); - job.getConfiguration().set(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson); // give subclasses their hook setupJob(job); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6acaaf6c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java index 95b099e..7e115e5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java @@ -17,30 +17,30 @@ */ package org.apache.phoenix.mapreduce; -import java.io.DataOutputStream; import java.io.IOException; import java.sql.SQLException; -import java.util.*; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Mapper; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; -import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.*; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.UpsertExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +59,7 @@ import com.google.common.collect.Lists; * to retrieve {@link ColumnInfo} from the target table. */ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair, - ImmutableBytesWritable> { + KeyValue> { protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueMapper.class); @@ -79,7 +79,6 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable /** Configuration key for the table names */ public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames"; - public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames"; /** Configuration key for the table configurations */ public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config"; @@ -95,14 +94,8 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable protected UpsertExecutor<RECORD, ?> upsertExecutor; protected ImportPreUpsertKeyValueProcessor preUpdateProcessor; protected List<String> tableNames; - protected List<String> logicalNames; protected MapperUpsertListener<RECORD> upsertListener; - /* - lookup table for column index. Index in the List matches to the index in tableNames List - */ - protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes; - protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf); protected abstract LineParser<RECORD> getLineParser(); @@ -119,17 +112,13 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable try { conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf); - - final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY); - final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY); - tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); - logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf); - - columnIndexes = initColumnIndexes(); } catch (SQLException | ClassNotFoundException e) { throw new RuntimeException(e); } + final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY); + tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); + upsertListener = new MapperUpsertListener<RECORD>( context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true)); upsertExecutor = buildUpsertExecutor(conf); @@ -138,8 +127,7 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable @SuppressWarnings("deprecation") @Override - protected void map(LongWritable key, Text value, Context context) throws IOException, - InterruptedException { + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (conn == null) { throw new RuntimeException("Connection not initialized."); } @@ -157,7 +145,7 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable return; } upsertExecutor.execute(ImmutableList.<RECORD>of(record)); - Map<Integer, List<KeyValue>> map = new HashMap<>(); + Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(conn, true); while (uncommittedDataIterator.hasNext()) { @@ -165,125 +153,24 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable List<KeyValue> keyValueList = kvPair.getSecond(); keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList); byte[] first = kvPair.getFirst(); - // Create a list of KV for each table - for (int i = 0; i < tableNames.size(); i++) { - if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) { - if (!map.containsKey(i)) { - map.put(i, new ArrayList<KeyValue>()); - } - List<KeyValue> list = map.get(i); - for (KeyValue kv : keyValueList) { - list.add(kv); - } - break; + for (String tableName : tableNames) { + if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) { + // skip edits for other tables + continue; + } + for (KeyValue kv : keyValueList) { + ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); + outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); + context.write(new TableRowkeyPair(tableName, outputKey), kv); } } } - for(Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) { - int tableIndex = rowEntry.getKey(); - List<KeyValue> lkv = rowEntry.getValue(); - // All KV values combines to a single byte array - writeAggregatedRow(context, tableIndex, lkv); - } conn.rollback(); } catch (Exception e) { throw new RuntimeException(e); } } - private List<Map<byte[],Map<byte[], Integer>>> initColumnIndexes() throws SQLException { - List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>(); - int tableIndex; - for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) { - PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex)); - Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); - List<PColumn> cls = table.getColumns(); - for(int i = 0; i < cls.size(); i++) { - PColumn c = cls.get(i); - if(c.getFamilyName() == null) continue; // Skip PK column - byte[] family = c.getFamilyName().getBytes(); - byte[] name = c.getName().getBytes(); - if(!columnMap.containsKey(family)) { - columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR)); - } - Map<byte[], Integer> qualifier = columnMap.get(family); - qualifier.put(name, i); - } - tableMap.add(columnMap); - } - return tableMap; - } - - /** - * Find the column index which will replace the column name in - * the aggregated array and will be restored in Reducer - * - * @param tableIndex Table index in tableNames list - * @param cell KeyValue for the column - * @return column index for the specified cell or -1 if was not found - */ - private int findIndex(int tableIndex, Cell cell) { - Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex); - Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(), - cell.getFamilyOffset(), cell.getFamilyLength())); - if(qualifiers!= null) { - Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(), - cell.getQualifierOffset(), cell.getQualifierLength())); - if(result!=null) { - return result; - } - } - return -1; - } - - /** - * Collect all column values for the same rowKey - * - * @param context Current mapper context - * @param tableIndex Table index in tableNames list - * @param lkv List of KV values that will be combined in a single ImmutableBytesWritable - * @throws IOException - * @throws InterruptedException - */ - - private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv) - throws IOException, InterruptedException { - TrustedByteArrayOutputStream bos = new TrustedByteArrayOutputStream(1024); - DataOutputStream outputStream = new DataOutputStream(bos); - ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); - if (!lkv.isEmpty()) { - // All Key Values for the same row are supposed to be the same, so init rowKey only once - Cell first = lkv.get(0); - outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength()); - for (KeyValue cell : lkv) { - if(isEmptyCell(cell)) { - continue; - } - int i = findIndex(tableIndex, cell); - if (i == -1) { - throw new IOException("No column found for KeyValue"); - } - WritableUtils.writeVInt(outputStream, i); - outputStream.writeByte(cell.getTypeByte()); - WritableUtils.writeVInt(outputStream, cell.getValueLength()); - outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - } - } - ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray()); - outputStream.close(); - context.write(new TableRowkeyPair(Integer.toString(tableIndex), outputKey), aggregatedArray); - } - - protected boolean isEmptyCell(KeyValue cell) { - if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0, - QueryConstants.EMPTY_COLUMN_BYTES.length) != 0) - return false; - else - return true; - } - - @Override protected void cleanup(Context context) throws IOException, InterruptedException { try { @@ -336,12 +223,11 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable @VisibleForTesting static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> { - private final Mapper<LongWritable, Text, - TableRowkeyPair, ImmutableBytesWritable>.Context context; + private final Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context; private final boolean ignoreRecordErrors; private MapperUpsertListener( - Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable>.Context context, + Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context, boolean ignoreRecordErrors) { this.context = context; this.ignoreRecordErrors = ignoreRecordErrors; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6acaaf6c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java index 0f90e45..5d00656 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java @@ -17,143 +17,36 @@ */ package org.apache.phoenix.mapreduce; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.IOException; -import java.sql.SQLException; -import java.util.*; +import java.util.TreeSet; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; -import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; -import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Reducer class for the bulkload jobs. * Performs similar functionality to {@link KeyValueSortReducer} */ public class FormatToKeyValueReducer - extends Reducer<TableRowkeyPair,ImmutableBytesWritable,TableRowkeyPair,KeyValue> { - - protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueReducer.class); - - - protected List<String> tableNames; - protected List<String> logicalNames; - protected KeyValueBuilder builder; - List<List<Pair<byte[], byte[]>>> columnIndexes; - List<ImmutableBytesPtr> emptyFamilyName; - - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - - // pass client configuration into driver - Properties clientInfos = new Properties(); - for (Map.Entry<String, String> entry : conf) { - clientInfos.setProperty(entry.getKey(), entry.getValue()); - } - - try { - PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf); - builder = conn.getKeyValueBuilder(); - final String tableNamesConf = conf.get(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY); - final String logicalNamesConf = conf.get(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY); - tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); - logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf); - - columnIndexes = new ArrayList<>(tableNames.size()); - emptyFamilyName = new ArrayList<>(); - initColumnsMap(conn); - } catch (SQLException | ClassNotFoundException e) { - throw new RuntimeException(e); - } - } - - private void initColumnsMap(PhoenixConnection conn) throws SQLException { - for (String tableName : logicalNames) { - PTable table = PhoenixRuntime.getTable(conn, tableName); - emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table)); - List<PColumn> cls = table.getColumns(); - List<Pair<byte[], byte[]>> list = new ArrayList(cls.size()); - for(int i = 0; i < cls.size(); i++) { - PColumn c = cls.get(i); - if(c.getFamilyName() == null) { - list.add(null); // Skip PK column - continue; - } - byte[] family = c.getFamilyName().getBytes(); - byte[] name = c.getName().getBytes(); - list.add(new Pair(family, name)); - } - columnIndexes.add(list); - } - - } + extends Reducer<TableRowkeyPair,KeyValue,TableRowkeyPair,KeyValue> { @Override - protected void reduce(TableRowkeyPair key, Iterable<ImmutableBytesWritable> values, - Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context) + protected void reduce(TableRowkeyPair key, Iterable<KeyValue> values, + Reducer<TableRowkeyPair, KeyValue, TableRowkeyPair, KeyValue>.Context context) throws IOException, InterruptedException { TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR); - int tableIndex = Integer.parseInt(key.getTableName()); - key.setTableName(tableNames.get(tableIndex)); - List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex); - for (ImmutableBytesWritable aggregatedArray : values) { - DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get())); - while (input.available()!= 0) { - int index = WritableUtils.readVInt(input); - Pair<byte[], byte[]> pair = columns.get(index); - byte type = input.readByte(); - ImmutableBytesWritable value = null; - int len = WritableUtils.readVInt(input); - if (len > 0) { - byte[] array = new byte[len]; - input.read(array); - value = new ImmutableBytesWritable(array); - } - KeyValue kv; - KeyValue.Type kvType = KeyValue.Type.codeToType(type); - switch (kvType) { - case Put: // not null value - kv = builder.buildPut(key.getRowkey(), - new ImmutableBytesWritable(pair.getFirst()), - new ImmutableBytesWritable(pair.getSecond()), value); - break; - case DeleteColumn: // null value - kv = builder.buildDeleteColumns(key.getRowkey(), - new ImmutableBytesWritable(pair.getFirst()), - new ImmutableBytesWritable(pair.getSecond())); - break; - default: - throw new IOException("Unsupported KeyValue type " + kvType); - } - map.add(kv); + for (KeyValue kv: values) { + try { + map.add(kv.clone()); + } catch (CloneNotSupportedException e) { + throw new java.io.IOException(e); } - KeyValue empty = builder.buildPut(key.getRowkey(), - emptyFamilyName.get(tableIndex), - QueryConstants.EMPTY_COLUMN_BYTES_PTR, ByteUtil.EMPTY_BYTE_ARRAY_PTR); - map.add(empty); - Closeables.closeQuietly(input); } context.setStatus("Read " + map.getClass()); int index = 0; - for (KeyValue kv : map) { + for (KeyValue kv: map) { context.write(key, kv); if (++index % 100 == 0) context.setStatus("Wrote " + index); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6acaaf6c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java index e02065f..d786842 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java @@ -78,25 +78,7 @@ public class TargetTableRefFunctions { } }; - public static Function<List<TargetTableRef>,String> LOGICAN_NAMES_TO_JSON = new Function<List<TargetTableRef>,String>() { - - @Override - public String apply(List<TargetTableRef> input) { - try { - List<String> tableNames = Lists.newArrayListWithCapacity(input.size()); - for(TargetTableRef table : input) { - tableNames.add(table.getLogicalName()); - } - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(tableNames); - } catch (IOException e) { - throw new RuntimeException(e); - } - - } - }; - - public static Function<String,List<String>> NAMES_FROM_JSON = new Function<String,List<String>>() { + public static Function<String,List<String>> NAMES_FROM_JSON = new Function<String,List<String>>() { @SuppressWarnings("unchecked") @Override @@ -110,4 +92,4 @@ public class TargetTableRefFunctions { } }; -} + }
