This is an automated email from the ASF dual-hosted git repository. snazy pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a76d288c32e9a062dfafc8e38a33b8aed064913e Merge: f3198c4 d51c18f Author: Yifan Cai <[email protected]> AuthorDate: Wed Jul 22 11:11:25 2020 +0200 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 4 + src/java/org/apache/cassandra/cql3/CQL3Type.java | 37 ++-- .../schema/CreateAggregateStatement.java | 4 +- .../statements/schema/CreateFunctionStatement.java | 4 +- .../statements/schema/DropAggregateStatement.java | 2 +- .../statements/schema/DropFunctionStatement.java | 2 +- .../cassandra/cql3/validation/entities/UFTest.java | 13 ++ .../cql3/validation/entities/UFTypesTest.java | 34 ++-- .../validation/operations/AggregationTest.java | 46 ++++- .../cassandra/io/sstable/CQLSSTableWriterTest.java | 203 ++++++++++----------- 10 files changed, 200 insertions(+), 149 deletions(-) diff --cc CHANGES.txt index afb3d09,22e7d1a..b56ee27 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,64 -1,8 +1,68 @@@ -3.11.8 +4.0-beta2 + * Verify sstable components on startup (CASSANDRA-15945) ++Merged from 3.11: + * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857) + + -3.11.7 +4.0-beta1 + * Remove BackPressureStrategy (CASSANDRA-15375) + * Improve messaging on indexing frozen collections (CASSANDRA-15908) + * USING_G1 is incorrectly set in cassandra-env.sh if G1 is explicitly disabled with -UseG1GC (CASSANDRA-15931) + * Update compaction_throughput_mb_per_sec throttle default to 64 (CASSANDRA-14902) + * Add option to disable compaction at startup (CASSANDRA-15927) + * FBUtilities.getJustLocalAddress falls back to lo ip on misconfigured nodes (CASSANDRA-15901) + * Close channel and reduce buffer allocation during entire sstable streaming with SSL (CASSANDRA-15900) + * Prune expired messages less frequently in internode messaging (CASSANDRA-15700) + * Fix Ec2Snitch handling of legacy mode for dc names matching both formats, eg "us-west-2" (CASSANDRA-15878) + * Add support for server side DESCRIBE statements (CASSANDRA-14825) + * Fail startup if -Xmn is set when the G1 garbage collector is used (CASSANDRA-15839) + * generateSplits method replaced the generateRandomTokens for ReplicationAwareTokenAllocator. (CASSANDRA-15877) + * Several mbeans are not unregistered when dropping a keyspace and table (CASSANDRA-14888) + * Update defaults for server and client TLS settings (CASSANDRA-15262) + * Differentiate follower/initator in StreamMessageHeader (CASSANDRA-15665) + * Add a startup check to detect if LZ4 uses java rather than native implementation (CASSANDRA-15884) + * Fix missing topology events when running multiple nodes on the same network interface (CASSANDRA-15677) + * Create config.yml.MIDRES (CASSANDRA-15712) + * Fix handling of fully purged static rows in repaired data tracking (CASSANDRA-15848) + * Prevent validation request submission from blocking ANTI_ENTROPY stage (CASSANDRA-15812) + * Add fqltool and auditlogviewer to rpm and deb packages (CASSANDRA-14712) + * Include DROPPED_COLUMNS in schema digest computation (CASSANDRA-15843) + * Fix Cassandra restart from rpm install (CASSANDRA-15830) + * Improve handling of 2i initialization failures (CASSANDRA-13606) + * Add completion_ratio column to sstable_tasks virtual table (CASANDRA-15759) + * Add support for adding custom Verbs (CASSANDRA-15725) + * Speed up entire-file-streaming file containment check and allow entire-file-streaming for all compaction strategies (CASSANDRA-15657,CASSANDRA-15783) + * Provide ability to configure IAuditLogger (CASSANDRA-15748) + * Fix nodetool enablefullquerylog blocking param parsing (CASSANDRA-15819) + * Add isTransient to SSTableMetadataView (CASSANDRA-15806) + * Fix tools/bin/fqltool for all shells (CASSANDRA-15820) + * Fix clearing of legacy size_estimates (CASSANDRA-15776) + * Update port when reconnecting to pre-4.0 SSL storage (CASSANDRA-15727) + * Only calculate dynamicBadnessThreshold once per loop in DynamicEndpointSnitch (CASSANDRA-15798) + * Cleanup redundant nodetool commands added in 4.0 (CASSANDRA-15256) + * Update to Python driver 3.23 for cqlsh (CASSANDRA-15793) + * Add tunable initial size and growth factor to RangeTombstoneList (CASSANDRA-15763) + * Improve debug logging in SSTableReader for index summary (CASSANDRA-15755) + * bin/sstableverify should support user provided token ranges (CASSANDRA-15753) + * Improve logging when mutation passed to commit log is too large (CASSANDRA-14781) + * replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560) + * Fix buffer pool NPE with concurrent release due to in-progress tiny pool eviction (CASSANDRA-15726) + * Avoid race condition when completing stream sessions (CASSANDRA-15666) + * Flush with fast compressors by default (CASSANDRA-15379) + * Fix CqlInputFormat regression from the switch to system.size_estimates (CASSANDRA-15637) + * Allow sending Entire SSTables over SSL (CASSANDRA-15740) + * Fix CQLSH UTF-8 encoding issue for Python 2/3 compatibility (CASSANDRA-15739) + * Fix batch statement preparation when multiple tables and parameters are used (CASSANDRA-15730) + * Fix regression with traceOutgoingMessage printing message size (CASSANDRA-15687) + * Ensure repaired data tracking reads a consistent amount of data across replicas (CASSANDRA-15601) + * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660) + * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597) + * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573) + * Improve logging around incremental repair (CASSANDRA-15599) + * Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688) + * Replace array iterators with get by index (CASSANDRA-15394) + * Minimize BTree iterator allocations (CASSANDRA-15389) +Merged from 3.11: * Fix cqlsh output when fetching all rows in batch mode (CASSANDRA-15905) * Upgrade Jackson to 2.9.10 (CASSANDRA-15867) * Fix CQL formatting of read command restrictions for slow query log (CASSANDRA-15503) diff --cc src/java/org/apache/cassandra/cql3/CQL3Type.java index 8f8df42,d1e6809..dba063c --- a/src/java/org/apache/cassandra/cql3/CQL3Type.java +++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java @@@ -581,9 -574,15 +586,9 @@@ public interface CQL3Typ public static Raw tuple(List<CQL3Type.Raw> ts) { - return new RawTuple(ts, false); + return new RawTuple(ts); } - public static Raw frozen(CQL3Type.Raw t) throws InvalidRequestException - { - t.freeze(); - return t; - } - private static class RawType extends Raw { private final CQL3Type type; @@@ -675,10 -665,11 +680,11 @@@ { assert values != null : "Got null values type for a collection"; - if (!frozen && values.supportsFreezing() && !values.frozen) + // skip if innerType is tuple, since tuple is implicitly forzen + if (!frozen && values.supportsFreezing() && !values.frozen && !values.isTuple()) throwNestedNonFrozenError(values); - // we represent Thrift supercolumns as maps, internally, and we do allow counters in supercolumns. Thus, + // we represent supercolumns as maps, internally, and we do allow counters in supercolumns. Thus, // for internal type parsing (think schema) we have to make an exception and allow counters as (map) values if (values.isCounter() && !isInternal) throw new InvalidRequestException("Counters are not allowed inside collections: " + this); @@@ -815,10 -807,11 +819,12 @@@ { private final List<CQL3Type.Raw> types; - private RawTuple(List<CQL3Type.Raw> types, boolean frozen) + private RawTuple(List<CQL3Type.Raw> types) { - super(frozen); - frozen = true; -- this.types = types; - freeze(); ++ super(true); ++ this.types = types.stream() ++ .map(t -> t.supportsFreezing() ? t.freeze() : t) ++ .collect(toList()); } public boolean supportsFreezing() @@@ -826,14 -819,13 +832,10 @@@ return true; } - public void freeze() throws InvalidRequestException + @Override + public RawTuple freeze() { - List<CQL3Type.Raw> frozenTypes = - types.stream() - .map(t -> t.supportsFreezing() ? t.freeze() : t) - .collect(toList()); - return new RawTuple(frozenTypes, true); - for (CQL3Type.Raw t : types) - if (t.supportsFreezing()) - t.freeze(); - - frozen = true; ++ return this; } public CQL3Type prepare(String keyspace, Types udts) throws InvalidRequestException diff --cc src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java index 462623d,0000000..cc9a96b mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java @@@ -1,334 -1,0 +1,334 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.FunctionResource; +import org.apache.cassandra.auth.IResource; +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.schema.Functions.FunctionsDiff; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; +import org.apache.cassandra.transport.Event.SchemaChange.Target; +import org.apache.cassandra.transport.ProtocolVersion; + +import static java.lang.String.format; +import static java.lang.String.join; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; + +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Iterables.transform; + +public final class CreateAggregateStatement extends AlterSchemaStatement +{ + private final String aggregateName; + private final List<CQL3Type.Raw> rawArgumentTypes; + private final CQL3Type.Raw rawStateType; + private final FunctionName stateFunctionName; + private final FunctionName finalFunctionName; + private final Term.Raw rawInitialValue; + private final boolean orReplace; + private final boolean ifNotExists; + + public CreateAggregateStatement(String keyspaceName, + String aggregateName, + List<CQL3Type.Raw> rawArgumentTypes, + CQL3Type.Raw rawStateType, + FunctionName stateFunctionName, + FunctionName finalFunctionName, + Term.Raw rawInitialValue, + boolean orReplace, + boolean ifNotExists) + { + super(keyspaceName); + this.aggregateName = aggregateName; + this.rawArgumentTypes = rawArgumentTypes; + this.rawStateType = rawStateType; + this.stateFunctionName = stateFunctionName; + this.finalFunctionName = finalFunctionName; + this.rawInitialValue = rawInitialValue; + this.orReplace = orReplace; + this.ifNotExists = ifNotExists; + } + + public Keyspaces apply(Keyspaces schema) + { + if (ifNotExists && orReplace) + throw ire("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives"); + + rawArgumentTypes.stream() - .filter(CQL3Type.Raw::isFrozen) ++ .filter(raw -> !raw.isTuple() && raw.isFrozen()) + .findFirst() + .ifPresent(t -> { throw ire("Argument '%s' cannot be frozen; remove frozen<> modifier from '%s'", t, t); }); + - if (rawStateType.isFrozen()) ++ if (!rawStateType.isTuple() && rawStateType.isFrozen()) + throw ire("State type '%s' cannot be frozen; remove frozen<> modifier from '%s'", rawStateType, rawStateType); + + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + throw ire("Keyspace '%s' doesn't exist", keyspaceName); + + /* + * Resolve the state function + */ + + List<AbstractType<?>> argumentTypes = + rawArgumentTypes.stream() + .map(t -> t.prepare(keyspaceName, keyspace.types).getType()) + .collect(toList()); + AbstractType<?> stateType = rawStateType.prepare(keyspaceName, keyspace.types).getType(); + List<AbstractType<?>> stateFunctionArguments = Lists.newArrayList(concat(singleton(stateType), argumentTypes)); + + Function stateFunction = + keyspace.functions + .find(stateFunctionName, stateFunctionArguments) + .orElseThrow(() -> ire("State function %s doesn't exist", stateFunctionString())); + + if (stateFunction.isAggregate()) + throw ire("State function %s isn't a scalar function", stateFunctionString()); + + if (!stateFunction.returnType().equals(stateType)) + { + throw ire("State function %s return type must be the same as the first argument type - check STYPE, argument and return types", + stateFunctionString()); + } + + /* + * Resolve the final function and return type + */ + + Function finalFunction = null; + AbstractType<?> returnType = stateFunction.returnType(); + + if (null != finalFunctionName) + { + finalFunction = keyspace.functions.find(finalFunctionName, singletonList(stateType)).orElse(null); + if (null == finalFunction) + throw ire("Final function %s doesn't exist", finalFunctionString()); + + if (finalFunction.isAggregate()) + throw ire("Final function %s isn't a scalar function", finalFunctionString()); + + // override return type with that of the final function + returnType = finalFunction.returnType(); + } + + /* + * Validate initial condition + */ + + ByteBuffer initialValue = null; + if (null != rawInitialValue) + { + initialValue = Terms.asBytes(keyspaceName, rawInitialValue.toString(), stateType); + + if (null != initialValue) + { + try + { + stateType.validate(initialValue); + } + catch (MarshalException e) + { + throw ire("Invalid value for INITCOND of type %s", stateType.asCQL3Type()); + } + } + + // Converts initcond to a CQL literal and parse it back to avoid another CASSANDRA-11064 + String initialValueString = stateType.asCQL3Type().toCQLLiteral(initialValue, ProtocolVersion.CURRENT); + assert Objects.equal(initialValue, Terms.asBytes(keyspaceName, initialValueString, stateType)); + + if (Constants.NULL_LITERAL != rawInitialValue && UDHelper.isNullOrEmpty(stateType, initialValue)) + throw ire("INITCOND must not be empty for all types except TEXT, ASCII, BLOB"); + } + + if (!((UDFunction) stateFunction).isCalledOnNullInput() && null == initialValue) + { + throw ire("Cannot create aggregate '%s' without INITCOND because state function %s does not accept 'null' arguments", + aggregateName, + stateFunctionName); + } + + /* + * Create or replace + */ + + UDAggregate aggregate = + new UDAggregate(new FunctionName(keyspaceName, aggregateName), + argumentTypes, + returnType, + (ScalarFunction) stateFunction, + (ScalarFunction) finalFunction, + initialValue); + + Function existingAggregate = keyspace.functions.find(aggregate.name(), argumentTypes).orElse(null); + if (null != existingAggregate) + { + if (!existingAggregate.isAggregate()) + throw ire("Aggregate '%s' cannot replace a function", aggregateName); + + if (ifNotExists) + return schema; + + if (!orReplace) + throw ire("Aggregate '%s' already exists", aggregateName); + + if (!returnType.isCompatibleWith(existingAggregate.returnType())) + { + throw ire("Cannot replace aggregate '%s', the new return type %s isn't compatible with the return type %s of existing function", + aggregateName, + returnType.asCQL3Type(), + existingAggregate.returnType().asCQL3Type()); + } + } + + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.withAddedOrUpdated(aggregate))); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + assert diff.altered.size() == 1; + FunctionsDiff<UDAggregate> udasDiff = diff.altered.get(0).udas; + + assert udasDiff.created.size() + udasDiff.altered.size() == 1; + boolean created = !udasDiff.created.isEmpty(); + + return new SchemaChange(created ? Change.CREATED : Change.UPDATED, + Target.AGGREGATE, + keyspaceName, + aggregateName, + rawArgumentTypes.stream().map(CQL3Type.Raw::toString).collect(toList())); + } + + public void authorize(ClientState client) + { + FunctionName name = new FunctionName(keyspaceName, aggregateName); + + if (Schema.instance.findFunction(name, Lists.transform(rawArgumentTypes, t -> t.prepare(keyspaceName).getType())).isPresent() && orReplace) + client.ensurePermission(Permission.ALTER, FunctionResource.functionFromCql(keyspaceName, aggregateName, rawArgumentTypes)); + else + client.ensurePermission(Permission.CREATE, FunctionResource.keyspace(keyspaceName)); + + FunctionResource stateFunction = + FunctionResource.functionFromCql(stateFunctionName, Lists.newArrayList(concat(singleton(rawStateType), rawArgumentTypes))); + client.ensurePermission(Permission.EXECUTE, stateFunction); + + if (null != finalFunctionName) + client.ensurePermission(Permission.EXECUTE, FunctionResource.functionFromCql(finalFunctionName, singletonList(rawStateType))); + } + + @Override + Set<IResource> createdResources(KeyspacesDiff diff) + { + assert diff.altered.size() == 1; + FunctionsDiff<UDAggregate> udasDiff = diff.altered.get(0).udas; + + assert udasDiff.created.size() + udasDiff.altered.size() == 1; + + return udasDiff.created.isEmpty() + ? ImmutableSet.of() + : ImmutableSet.of(FunctionResource.functionFromCql(keyspaceName, aggregateName, rawArgumentTypes)); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.CREATE_AGGREGATE, keyspaceName, aggregateName); + } + + public String toString() + { + return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, aggregateName); + } + + private String stateFunctionString() + { + return format("%s(%s)", stateFunctionName, join(", ", transform(concat(singleton(rawStateType), rawArgumentTypes), Object::toString))); + } + + private String finalFunctionString() + { + return format("%s(%s)", finalFunctionName, rawStateType); + } + + public static final class Raw extends CQLStatement.Raw + { + private final FunctionName aggregateName; + private final List<CQL3Type.Raw> rawArgumentTypes; + private final CQL3Type.Raw rawStateType; + private final String stateFunctionName; + private final String finalFunctionName; + private final Term.Raw rawInitialValue; + private final boolean orReplace; + private final boolean ifNotExists; + + public Raw(FunctionName aggregateName, + List<CQL3Type.Raw> rawArgumentTypes, + CQL3Type.Raw rawStateType, + String stateFunctionName, + String finalFunctionName, + Term.Raw rawInitialValue, + boolean orReplace, + boolean ifNotExists) + { + this.aggregateName = aggregateName; + this.rawArgumentTypes = rawArgumentTypes; + this.rawStateType = rawStateType; + this.stateFunctionName = stateFunctionName; + this.finalFunctionName = finalFunctionName; + this.rawInitialValue = rawInitialValue; + this.orReplace = orReplace; + this.ifNotExists = ifNotExists; + } + + public CreateAggregateStatement prepare(ClientState state) + { + String keyspaceName = aggregateName.hasKeyspace() ? aggregateName.keyspace : state.getKeyspace(); + + return new CreateAggregateStatement(keyspaceName, + aggregateName.name, + rawArgumentTypes, + rawStateType, + new FunctionName(keyspaceName, stateFunctionName), + null != finalFunctionName ? new FunctionName(keyspaceName, finalFunctionName) : null, + rawInitialValue, + orReplace, + ifNotExists); + } + } +} diff --cc src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java index 20c4ad9,0000000..18d8479 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java @@@ -1,255 -1,0 +1,255 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.FunctionResource; +import org.apache.cassandra.auth.IResource; +import org.apache.cassandra.auth.*; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.functions.Function; +import org.apache.cassandra.cql3.functions.FunctionName; +import org.apache.cassandra.cql3.functions.UDFunction; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.schema.Functions.FunctionsDiff; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; +import org.apache.cassandra.transport.Event.SchemaChange.Target; + +import static java.util.stream.Collectors.toList; + +public final class CreateFunctionStatement extends AlterSchemaStatement +{ + private final String functionName; + private final List<ColumnIdentifier> argumentNames; + private final List<CQL3Type.Raw> rawArgumentTypes; + private final CQL3Type.Raw rawReturnType; + private final boolean calledOnNullInput; + private final String language; + private final String body; + private final boolean orReplace; + private final boolean ifNotExists; + + public CreateFunctionStatement(String keyspaceName, + String functionName, + List<ColumnIdentifier> argumentNames, + List<CQL3Type.Raw> rawArgumentTypes, + CQL3Type.Raw rawReturnType, + boolean calledOnNullInput, + String language, + String body, + boolean orReplace, + boolean ifNotExists) + { + super(keyspaceName); + this.functionName = functionName; + this.argumentNames = argumentNames; + this.rawArgumentTypes = rawArgumentTypes; + this.rawReturnType = rawReturnType; + this.calledOnNullInput = calledOnNullInput; + this.language = language; + this.body = body; + this.orReplace = orReplace; + this.ifNotExists = ifNotExists; + } + + // TODO: replace affected aggregates !! + public Keyspaces apply(Keyspaces schema) + { + if (ifNotExists && orReplace) + throw ire("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives"); + + UDFunction.assertUdfsEnabled(language); + + if (new HashSet<>(argumentNames).size() != argumentNames.size()) + throw ire("Duplicate argument names for given function %s with argument names %s", functionName, argumentNames); + + rawArgumentTypes.stream() - .filter(CQL3Type.Raw::isFrozen) ++ .filter(raw -> !raw.isTuple() && raw.isFrozen()) + .findFirst() + .ifPresent(t -> { throw ire("Argument '%s' cannot be frozen; remove frozen<> modifier from '%s'", t, t); }); + - if (rawReturnType.isFrozen()) ++ if (!rawReturnType.isTuple() && rawReturnType.isFrozen()) + throw ire("Return type '%s' cannot be frozen; remove frozen<> modifier from '%s'", rawReturnType, rawReturnType); + + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + throw ire("Keyspace '%s' doesn't exist", keyspaceName); + + List<AbstractType<?>> argumentTypes = + rawArgumentTypes.stream() + .map(t -> t.prepare(keyspaceName, keyspace.types).getType()) + .collect(toList()); + AbstractType<?> returnType = rawReturnType.prepare(keyspaceName, keyspace.types).getType(); + + UDFunction function = + UDFunction.create(new FunctionName(keyspaceName, functionName), + argumentNames, + argumentTypes, + returnType, + calledOnNullInput, + language, + body); + + Function existingFunction = keyspace.functions.find(function.name(), argumentTypes).orElse(null); + if (null != existingFunction) + { + if (existingFunction.isAggregate()) + throw ire("Function '%s' cannot replace an aggregate", functionName); + + if (ifNotExists) + return schema; + + if (!orReplace) + throw ire("Function '%s' already exists", functionName); + + if (calledOnNullInput != ((UDFunction) existingFunction).isCalledOnNullInput()) + { + throw ire("Function '%s' must have %s directive", + functionName, + calledOnNullInput ? "CALLED ON NULL INPUT" : "RETURNS NULL ON NULL INPUT"); + } + + if (!returnType.isCompatibleWith(existingFunction.returnType())) + { + throw ire("Cannot replace function '%s', the new return type %s is not compatible with the return type %s of existing function", + functionName, + returnType.asCQL3Type(), + existingFunction.returnType().asCQL3Type()); + } + + // TODO: update dependent aggregates + } + + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.withAddedOrUpdated(function))); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + assert diff.altered.size() == 1; + FunctionsDiff<UDFunction> udfsDiff = diff.altered.get(0).udfs; + + assert udfsDiff.created.size() + udfsDiff.altered.size() == 1; + boolean created = !udfsDiff.created.isEmpty(); + + return new SchemaChange(created ? Change.CREATED : Change.UPDATED, + Target.FUNCTION, + keyspaceName, + functionName, + rawArgumentTypes.stream().map(CQL3Type.Raw::toString).collect(toList())); + } + + public void authorize(ClientState client) + { + FunctionName name = new FunctionName(keyspaceName, functionName); + + if (Schema.instance.findFunction(name, Lists.transform(rawArgumentTypes, t -> t.prepare(keyspaceName).getType())).isPresent() && orReplace) + client.ensurePermission(Permission.ALTER, FunctionResource.functionFromCql(keyspaceName, functionName, rawArgumentTypes)); + else + client.ensurePermission(Permission.CREATE, FunctionResource.keyspace(keyspaceName)); + } + + @Override + Set<IResource> createdResources(KeyspacesDiff diff) + { + assert diff.altered.size() == 1; + FunctionsDiff<UDFunction> udfsDiff = diff.altered.get(0).udfs; + + assert udfsDiff.created.size() + udfsDiff.altered.size() == 1; + + return udfsDiff.created.isEmpty() + ? ImmutableSet.of() + : ImmutableSet.of(FunctionResource.functionFromCql(keyspaceName, functionName, rawArgumentTypes)); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.CREATE_FUNCTION, keyspaceName, functionName); + } + + public String toString() + { + return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, functionName); + } + + public static final class Raw extends CQLStatement.Raw + { + private final FunctionName name; + private final List<ColumnIdentifier> argumentNames; + private final List<CQL3Type.Raw> rawArgumentTypes; + private final CQL3Type.Raw rawReturnType; + private final boolean calledOnNullInput; + private final String language; + private final String body; + private final boolean orReplace; + private final boolean ifNotExists; + + public Raw(FunctionName name, + List<ColumnIdentifier> argumentNames, + List<CQL3Type.Raw> rawArgumentTypes, + CQL3Type.Raw rawReturnType, + boolean calledOnNullInput, + String language, + String body, + boolean orReplace, + boolean ifNotExists) + { + this.name = name; + this.argumentNames = argumentNames; + this.rawArgumentTypes = rawArgumentTypes; + this.rawReturnType = rawReturnType; + this.calledOnNullInput = calledOnNullInput; + this.language = language; + this.body = body; + this.orReplace = orReplace; + this.ifNotExists = ifNotExists; + } + + public CreateFunctionStatement prepare(ClientState state) + { + String keyspaceName = name.hasKeyspace() ? name.keyspace : state.getKeyspace(); + + return new CreateFunctionStatement(keyspaceName, + name.name, + argumentNames, + rawArgumentTypes, + rawReturnType, + calledOnNullInput, + language, + body, + orReplace, + ifNotExists); + } + } +} diff --cc src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java index d24f77e,0000000..7302158 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java @@@ -1,179 -1,0 +1,179 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.util.Collection; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.FunctionResource; +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.functions.Function; +import org.apache.cassandra.cql3.functions.FunctionName; +import org.apache.cassandra.cql3.functions.UDAggregate; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.schema.*; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; + +import static java.lang.String.format; +import static java.lang.String.join; +import static java.util.stream.Collectors.toList; + +import static com.google.common.collect.Iterables.transform; + +public final class DropAggregateStatement extends AlterSchemaStatement +{ + private final String aggregateName; + private final List<CQL3Type.Raw> arguments; + private final boolean argumentsSpeficied; + private final boolean ifExists; + + public DropAggregateStatement(String keyspaceName, + String aggregateName, + List<CQL3Type.Raw> arguments, + boolean argumentsSpeficied, + boolean ifExists) + { + super(keyspaceName); + this.aggregateName = aggregateName; + this.arguments = arguments; + this.argumentsSpeficied = argumentsSpeficied; + this.ifExists = ifExists; + } + + public Keyspaces apply(Keyspaces schema) + { + String name = + argumentsSpeficied + ? format("%s.%s(%s)", keyspaceName, aggregateName, join(", ", transform(arguments, CQL3Type.Raw::toString))) + : format("%s.%s", keyspaceName, aggregateName); + + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + { + if (ifExists) + return schema; + + throw ire("Aggregate '%s' doesn't exist", name); + } + + Collection<Function> aggregates = keyspace.functions.get(new FunctionName(keyspaceName, aggregateName)); + if (aggregates.size() > 1 && !argumentsSpeficied) + { + throw ire("'DROP AGGREGATE %s' matches multiple function definitions; " + + "specify the argument types by issuing a statement like " + + "'DROP AGGREGATE %s (type, type, ...)'. You can use cqlsh " + + "'DESCRIBE AGGREGATE %s' command to find all overloads", + aggregateName, aggregateName, aggregateName); + } + + arguments.stream() - .filter(CQL3Type.Raw::isFrozen) ++ .filter(raw -> !raw.isTuple() && raw.isFrozen()) + .findFirst() + .ifPresent(t -> { throw ire("Argument '%s' cannot be frozen; remove frozen<> modifier from '%s'", t, t); }); + + List<AbstractType<?>> argumentTypes = prepareArgumentTypes(keyspace.types); + + Predicate<Function> filter = Functions.Filter.UDA; + if (argumentsSpeficied) + filter = filter.and(f -> Functions.typesMatch(f.argTypes(), argumentTypes)); + + Function aggregate = aggregates.stream().filter(filter).findAny().orElse(null); + if (null == aggregate) + { + if (ifExists) + return schema; + + throw ire("Aggregate '%s' doesn't exist", name); + } + + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.without(aggregate))); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + Functions dropped = diff.altered.get(0).udas.dropped; + assert dropped.size() == 1; + return SchemaChange.forAggregate(Change.DROPPED, (UDAggregate) dropped.iterator().next()); + } + + public void authorize(ClientState client) + { + KeyspaceMetadata keyspace = Schema.instance.getKeyspaceMetadata(keyspaceName); + if (null == keyspace) + return; + + Stream<Function> functions = keyspace.functions.get(new FunctionName(keyspaceName, aggregateName)).stream(); + if (argumentsSpeficied) + functions = functions.filter(f -> Functions.typesMatch(f.argTypes(), prepareArgumentTypes(keyspace.types))); + + functions.forEach(f -> client.ensurePermission(Permission.DROP, FunctionResource.function(f))); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.DROP_AGGREGATE, keyspaceName, aggregateName); + } + + public String toString() + { + return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, aggregateName); + } + + private List<AbstractType<?>> prepareArgumentTypes(Types types) + { + return arguments.stream() + .map(t -> t.prepare(keyspaceName, types)) + .map(CQL3Type::getType) + .collect(toList()); + } + + public static final class Raw extends CQLStatement.Raw + { + private final FunctionName name; + private final List<CQL3Type.Raw> arguments; + private final boolean argumentsSpecified; + private final boolean ifExists; + + public Raw(FunctionName name, + List<CQL3Type.Raw> arguments, + boolean argumentsSpecified, + boolean ifExists) + { + this.name = name; + this.arguments = arguments; + this.argumentsSpecified = argumentsSpecified; + this.ifExists = ifExists; + } + + public DropAggregateStatement prepare(ClientState state) + { + String keyspaceName = name.hasKeyspace() ? name.keyspace : state.getKeyspace(); + return new DropAggregateStatement(keyspaceName, name.name, arguments, argumentsSpecified, ifExists); + } + } +} diff --cc src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java index f7d7d4a,0000000..99bfd64 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java @@@ -1,187 -1,0 +1,187 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.util.Collection; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.FunctionResource; +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.schema.*; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; + +import static java.lang.String.format; +import static java.lang.String.join; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; + +import static com.google.common.collect.Iterables.transform; + +public final class DropFunctionStatement extends AlterSchemaStatement +{ + private final String functionName; + private final Collection<CQL3Type.Raw> arguments; + private final boolean argumentsSpeficied; + private final boolean ifExists; + + public DropFunctionStatement(String keyspaceName, + String functionName, + Collection<CQL3Type.Raw> arguments, + boolean argumentsSpeficied, + boolean ifExists) + { + super(keyspaceName); + this.functionName = functionName; + this.arguments = arguments; + this.argumentsSpeficied = argumentsSpeficied; + this.ifExists = ifExists; + } + + public Keyspaces apply(Keyspaces schema) + { + String name = + argumentsSpeficied + ? format("%s.%s(%s)", keyspaceName, functionName, join(", ", transform(arguments, CQL3Type.Raw::toString))) + : format("%s.%s", keyspaceName, functionName); + + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + { + if (ifExists) + return schema; + + throw ire("Function '%s' doesn't exist", name); + } + + Collection<Function> functions = keyspace.functions.get(new FunctionName(keyspaceName, functionName)); + if (functions.size() > 1 && !argumentsSpeficied) + { + throw ire("'DROP FUNCTION %s' matches multiple function definitions; " + + "specify the argument types by issuing a statement like " + + "'DROP FUNCTION %s (type, type, ...)'. You can use cqlsh " + + "'DESCRIBE FUNCTION %s' command to find all overloads", + functionName, functionName, functionName); + } + + arguments.stream() - .filter(CQL3Type.Raw::isFrozen) ++ .filter(raw -> !raw.isTuple() && raw.isFrozen()) + .findFirst() + .ifPresent(t -> { throw ire("Argument '%s' cannot be frozen; remove frozen<> modifier from '%s'", t, t); }); + + List<AbstractType<?>> argumentTypes = prepareArgumentTypes(keyspace.types); + + Predicate<Function> filter = Functions.Filter.UDF; + if (argumentsSpeficied) + filter = filter.and(f -> Functions.typesMatch(f.argTypes(), argumentTypes)); + + Function function = functions.stream().filter(filter).findAny().orElse(null); + if (null == function) + { + if (ifExists) + return schema; + + throw ire("Function '%s' doesn't exist", name); + } + + String dependentAggregates = + keyspace.functions + .aggregatesUsingFunction(function) + .map(a -> a.name().toString()) + .collect(joining(", ")); + + if (!dependentAggregates.isEmpty()) + throw ire("Function '%s' is still referenced by aggregates %s", name, dependentAggregates); + + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.without(function))); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + Functions dropped = diff.altered.get(0).udfs.dropped; + assert dropped.size() == 1; + return SchemaChange.forFunction(Change.DROPPED, (UDFunction) dropped.iterator().next()); + } + + public void authorize(ClientState client) + { + KeyspaceMetadata keyspace = Schema.instance.getKeyspaceMetadata(keyspaceName); + if (null == keyspace) + return; + + Stream<Function> functions = keyspace.functions.get(new FunctionName(keyspaceName, functionName)).stream(); + if (argumentsSpeficied) + functions = functions.filter(f -> Functions.typesMatch(f.argTypes(), prepareArgumentTypes(keyspace.types))); + + functions.forEach(f -> client.ensurePermission(Permission.DROP, FunctionResource.function(f))); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.DROP_FUNCTION, keyspaceName, functionName); + } + + public String toString() + { + return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, functionName); + } + + private List<AbstractType<?>> prepareArgumentTypes(Types types) + { + return arguments.stream() + .map(t -> t.prepare(keyspaceName, types)) + .map(CQL3Type::getType) + .collect(toList()); + } + + public static final class Raw extends CQLStatement.Raw + { + private final FunctionName name; + private final List<CQL3Type.Raw> arguments; + private final boolean argumentsSpecified; + private final boolean ifExists; + + public Raw(FunctionName name, + List<CQL3Type.Raw> arguments, + boolean argumentsSpecified, + boolean ifExists) + { + this.name = name; + this.arguments = arguments; + this.argumentsSpecified = argumentsSpecified; + this.ifExists = ifExists; + } + + public DropFunctionStatement prepare(ClientState state) + { + String keyspaceName = name.hasKeyspace() ? name.keyspace : state.getKeyspace(); + return new DropFunctionStatement(keyspaceName, name.name, arguments, argumentsSpecified, ifExists); + } + } +} diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java index d21d159,e28af27..d691374 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java @@@ -122,6 -110,19 +122,19 @@@ public class UFTest extends CQLTeste assertLastSchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION, KEYSPACE, parseFunctionName(f).name, "double", "double"); + + // The function with nested tuple should be created without throwing InvalidRequestException. See CASSANDRA-15857 + String f1 = createFunction(KEYSPACE, + "list<tuple<int, int>>, double", + "CREATE OR REPLACE FUNCTION %s(state list<tuple<int, int>>, val double) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS double " + + "LANGUAGE javascript " + + "AS '\"string\";';"); + + assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION, + KEYSPACE, parseFunctionName(f1).name, - "list<frozen<tuple<int, int>>>", "double"); // CASSANDRA-14825: remove frozen from param ++ "list<tuple<int, int>>", "double"); } @Test diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java index 703d0ad,b2cf5dd..3440748 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java @@@ -449,6 -459,26 +449,26 @@@ public class AggregationTest extends CQ assertLastSchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE, KEYSPACE, parseFunctionName(a).name, "double"); + + // The aggregate with nested tuple should be created without throwing InvalidRequestException. See CASSANDRA-15857 + String f1 = createFunction(KEYSPACE, + "double, double", + "CREATE OR REPLACE FUNCTION %s(state double, val list<tuple<int, int>>) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS double " + + "LANGUAGE javascript " + + "AS '\"string\";';"); + + String a1 = createAggregate(KEYSPACE, + "list<tuple<int, int>>", + "CREATE OR REPLACE AGGREGATE %s(list<tuple<int, int>>) " + + "SFUNC " + shortFunctionName(f1) + " " + + "STYPE double " + + "INITCOND 0"); + + assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE, + KEYSPACE, parseFunctionName(a1).name, - "list<frozen<tuple<int, int>>>"); // CASSANDRA-14825: remove frozen from param ++ "list<tuple<int, int>>"); } @Test diff --cc test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index f035658,dbeefbb..31c588b --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@@ -27,8 -29,11 +29,11 @@@ import com.google.common.collect.Immuta import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; -import org.apache.commons.io.FileUtils; -import org.junit.After; + import org.junit.Before; import org.junit.BeforeClass; ++import org.junit.Rule; import org.junit.Test; ++import org.junit.rules.TemporaryFolder; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; @@@ -52,11 -57,18 +57,20 @@@ import static org.junit.Assert.fail public class CQLSSTableWriterTest { + private static final AtomicInteger idGen = new AtomicInteger(0); + private String keyspace; + private String table; + private String qualifiedTable; + private File dataDir; - private File tempDir; + static { DatabaseDescriptor.daemonInitialization(); } ++ @Rule ++ public TemporaryFolder tempFolder = new TemporaryFolder(); ++ @BeforeClass public static void setup() throws Exception { @@@ -66,6 -77,24 +80,16 @@@ StorageService.instance.initServer(); } + @Before - public void perTestSetup() ++ public void perTestSetup() throws IOException + { - tempDir = Files.createTempDir(); + keyspace = "cql_keyspace" + idGen.incrementAndGet(); + table = "table" + idGen.incrementAndGet(); + qualifiedTable = keyspace + '.' + table; - dataDir = new File(tempDir.getAbsolutePath() + File.separator + keyspace + File.separator + table); ++ dataDir = new File(tempFolder.newFolder().getAbsolutePath() + File.separator + keyspace + File.separator + table); + assert dataDir.mkdirs(); + } + - @After - public void cleanup() throws IOException - { - FileUtils.deleteDirectory(tempDir); - } - - @Test public void testUnsortedWriter() throws Exception { @@@ -347,10 -343,10 +338,10 @@@ } writer.close(); - loadSSTables(dataDir, KS); + loadSSTables(dataDir, keyspace); - UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); + UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + keyspace + "." + table); - TypeCodec collectionCodec = UDHelper.codecFor(DataType.CollectionType.frozenList(tuple2Type)); + TypeCodec collectionCodec = UDHelper.codecFor(DataType.CollectionType.list(tuple2Type)); TypeCodec tuple3Codec = UDHelper.codecFor(tuple3Type); assertEquals(resultSet.size(), 100); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
