Merge branch 'cassandra-3.11' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e20d951
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e20d951
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e20d951

Branch: refs/heads/trunk
Commit: 1e20d9513073194b02c2cd5c324f118d38e16f19
Parents: 87169c8 1a37992
Author: Jeff Jirsa <j...@jeffjirsa.net>
Authored: Mon Apr 24 09:11:30 2017 -0700
Committer: Jeff Jirsa <j...@jeffjirsa.net>
Committed: Mon Apr 24 09:11:56 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                                | 1 +
 src/java/org/apache/cassandra/schema/MigrationManager.java | 9 +++++++--
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e20d951/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e20d951/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/MigrationManager.java
index d34a096,0000000..7ad8cad
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@@ -1,493 -1,0 +1,498 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.lang.management.ManagementFactory;
 +import java.lang.management.RuntimeMXBean;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.cql3.functions.UDAggregate;
 +import org.apache.cassandra.cql3.functions.UDFunction;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.UserType;
 +import org.apache.cassandra.exceptions.AlreadyExistsException;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.gms.*;
 +import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.WrappedRunnable;
 +
 +public class MigrationManager
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(MigrationManager.class);
 +
 +    public static final MigrationManager instance = new MigrationManager();
 +
 +    private static final RuntimeMXBean runtimeMXBean = 
ManagementFactory.getRuntimeMXBean();
 +
 +    private static final int MIGRATION_DELAY_IN_MS = 60000;
 +
 +    private static final int MIGRATION_TASK_WAIT_IN_SECONDS = 
Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", 
"1"));
 +
 +    private MigrationManager() {}
 +
 +    public static void scheduleSchemaPull(InetAddress endpoint, EndpointState 
state)
 +    {
 +        VersionedValue value = 
state.getApplicationState(ApplicationState.SCHEMA);
 +
 +        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != 
null)
 +            maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
 +    }
 +
 +    /**
 +     * If versions differ this node sends request with local migration list 
to the endpoint
 +     * and expecting to receive a list of migrations to apply locally.
 +     */
 +    private static void maybeScheduleSchemaPull(final UUID theirVersion, 
final InetAddress endpoint)
 +    {
 +        if ((Schema.instance.getVersion() != null && 
Schema.instance.getVersion().equals(theirVersion)) || 
!shouldPullSchemaFrom(endpoint))
 +        {
 +            logger.debug("Not pulling schema because versions match or 
shouldPullSchemaFrom returned false");
 +            return;
 +        }
 +
 +        if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) 
|| runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
 +        {
 +            // If we think we may be bootstrapping or have recently started, 
submit MigrationTask immediately
 +            logger.debug("Submitting migration task for {}", endpoint);
 +            submitMigrationTask(endpoint);
 +        }
 +        else
 +        {
 +            // Include a delay to make sure we have a chance to apply any 
changes being
 +            // pushed out simultaneously. See CASSANDRA-5025
 +            Runnable runnable = () ->
 +            {
 +                // grab the latest version of the schema since it may have 
changed again since the initial scheduling
 +                EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                if (epState == null)
 +                {
 +                    logger.debug("epState vanished for {}, not submitting 
migration task", endpoint);
 +                    return;
 +                }
 +                VersionedValue value = 
epState.getApplicationState(ApplicationState.SCHEMA);
 +                UUID currentVersion = UUID.fromString(value.value);
 +                if (Schema.instance.getVersion().equals(currentVersion))
 +                {
 +                    logger.debug("not submitting migration task for {} 
because our versions match", endpoint);
 +                    return;
 +                }
 +                logger.debug("submitting migration task for {}", endpoint);
 +                submitMigrationTask(endpoint);
 +            };
 +            ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 
MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
 +        }
 +    }
 +
 +    private static Future<?> submitMigrationTask(InetAddress endpoint)
 +    {
 +        /*
 +         * Do not de-ref the future because that causes distributed deadlock 
(CASSANDRA-3832) because we are
 +         * running in the gossip stage.
 +         */
 +        return StageManager.getStage(Stage.MIGRATION).submit(new 
MigrationTask(endpoint));
 +    }
 +
 +    static boolean shouldPullSchemaFrom(InetAddress endpoint)
 +    {
 +        /*
 +         * Don't request schema from nodes with a differnt or unknonw major 
version (may have incompatible schema)
 +         * Don't request schema from fat clients
 +         */
 +        return MessagingService.instance().knowsVersion(endpoint)
 +                && MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version
 +                && !Gossiper.instance.isGossipOnlyMember(endpoint);
 +    }
 +
 +    public static boolean isReadyForBootstrap()
 +    {
 +        return MigrationTask.getInflightTasks().isEmpty();
 +    }
 +
 +    public static void waitUntilReadyForBootstrap()
 +    {
 +        CountDownLatch completionLatch;
 +        while ((completionLatch = MigrationTask.getInflightTasks().poll()) != 
null)
 +        {
 +            try
 +            {
 +                if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, 
TimeUnit.SECONDS))
 +                    logger.error("Migration task failed to complete");
 +            }
 +            catch (InterruptedException e)
 +            {
 +                Thread.currentThread().interrupt();
 +                logger.error("Migration task was interrupted");
 +            }
 +        }
 +    }
 +
 +    public static void announceNewKeyspace(KeyspaceMetadata ksm) throws 
ConfigurationException
 +    {
 +        announceNewKeyspace(ksm, false);
 +    }
 +
 +    public static void announceNewKeyspace(KeyspaceMetadata ksm, boolean 
announceLocally) throws ConfigurationException
 +    {
 +        announceNewKeyspace(ksm, FBUtilities.timestampMicros(), 
announceLocally);
 +    }
 +
 +    public static void announceNewKeyspace(KeyspaceMetadata ksm, long 
timestamp, boolean announceLocally) throws ConfigurationException
 +    {
 +        ksm.validate();
 +
 +        if (Schema.instance.getKeyspaceMetadata(ksm.name) != null)
 +            throw new AlreadyExistsException(ksm.name);
 +
 +        logger.info("Create new Keyspace: {}", ksm);
 +        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm, timestamp), 
announceLocally);
 +    }
 +
 +    public static void announceNewTable(TableMetadata cfm) throws 
ConfigurationException
 +    {
 +        announceNewTable(cfm, false);
 +    }
 +
 +    public static void announceNewTable(TableMetadata cfm, boolean 
announceLocally)
 +    {
 +        announceNewTable(cfm, announceLocally, true);
 +    }
 +
 +    /**
 +     * Announces the table even if the definition is already know locally.
 +     * This should generally be avoided but is used internally when we want 
to force the most up to date version of
 +     * a system table schema (Note that we don't know if the schema we force 
_is_ the most recent version or not, we
 +     * just rely on idempotency to basically ignore that announce if it's 
not. That's why we can't use announceUpdateColumnFamily,
 +     * it would for instance delete new columns if this is not called with 
the most up-to-date version)
 +     *
 +     * Note that this is only safe for system tables where we know the id is 
fixed and will be the same whatever version
 +     * of the definition is used.
 +     */
 +    public static void forceAnnounceNewTable(TableMetadata cfm)
 +    {
-         announceNewTable(cfm, false, false);
++        announceNewTable(cfm, false, false, 0);
 +    }
 +
 +    private static void announceNewTable(TableMetadata cfm, boolean 
announceLocally, boolean throwOnDuplicate)
 +    {
++        announceNewTable(cfm, announceLocally, throwOnDuplicate, 
FBUtilities.timestampMicros());
++    }
++
++    private static void announceNewTable(TableMetadata cfm, boolean 
announceLocally, boolean throwOnDuplicate, long timestamp)
++    {
 +        cfm.validate();
 +
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(cfm.keyspace);
 +        if (ksm == null)
 +            throw new ConfigurationException(String.format("Cannot add table 
'%s' to non existing keyspace '%s'.", cfm.name, cfm.keyspace));
 +        // If we have a table or a view which has the same name, we can't add 
a new one
 +        else if (throwOnDuplicate && ksm.getTableOrViewNullable(cfm.name) != 
null)
 +            throw new AlreadyExistsException(cfm.keyspace, cfm.name);
 +
 +        logger.info("Create new table: {}", cfm);
-         announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, 
FBUtilities.timestampMicros()), announceLocally);
++        announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, timestamp), 
announceLocally);
 +    }
 +
 +    public static void announceNewView(ViewMetadata view, boolean 
announceLocally) throws ConfigurationException
 +    {
 +        view.metadata.validate();
 +
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(view.keyspace);
 +        if (ksm == null)
 +            throw new ConfigurationException(String.format("Cannot add table 
'%s' to non existing keyspace '%s'.", view.name, view.keyspace));
 +        else if (ksm.getTableOrViewNullable(view.name) != null)
 +            throw new AlreadyExistsException(view.keyspace, view.name);
 +
 +        logger.info("Create new view: {}", view);
 +        announce(SchemaKeyspace.makeCreateViewMutation(ksm, view, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceNewType(UserType newType, boolean 
announceLocally)
 +    {
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(newType.keyspace);
 +        announce(SchemaKeyspace.makeCreateTypeMutation(ksm, newType, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceNewFunction(UDFunction udf, boolean 
announceLocally)
 +    {
 +        logger.info("Create scalar function '{}'", udf.name());
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
 +        announce(SchemaKeyspace.makeCreateFunctionMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceNewAggregate(UDAggregate udf, boolean 
announceLocally)
 +    {
 +        logger.info("Create aggregate function '{}'", udf.name());
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
 +        announce(SchemaKeyspace.makeCreateAggregateMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceKeyspaceUpdate(KeyspaceMetadata ksm) throws 
ConfigurationException
 +    {
 +        announceKeyspaceUpdate(ksm, false);
 +    }
 +
 +    public static void announceKeyspaceUpdate(KeyspaceMetadata ksm, boolean 
announceLocally) throws ConfigurationException
 +    {
 +        ksm.validate();
 +
 +        KeyspaceMetadata oldKsm = 
Schema.instance.getKeyspaceMetadata(ksm.name);
 +        if (oldKsm == null)
 +            throw new ConfigurationException(String.format("Cannot update non 
existing keyspace '%s'.", ksm.name));
 +
 +        logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, 
ksm);
 +        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, 
ksm.params, FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceTableUpdate(TableMetadata tm) throws 
ConfigurationException
 +    {
 +        announceTableUpdate(tm, false);
 +    }
 +
 +    public static void announceTableUpdate(TableMetadata updated, boolean 
announceLocally) throws ConfigurationException
 +    {
 +        updated.validate();
 +
 +        TableMetadata current = 
Schema.instance.getTableMetadata(updated.keyspace, updated.name);
 +        if (current == null)
 +            throw new ConfigurationException(String.format("Cannot update non 
existing table '%s' in keyspace '%s'.", updated.name, updated.keyspace));
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(current.keyspace);
 +
 +        current.validateCompatibility(updated);
 +
 +        logger.info("Update table '{}/{}' From {} To {}", current.keyspace, 
current.name, current, updated);
 +        announce(SchemaKeyspace.makeUpdateTableMutation(ksm, current, 
updated, FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceViewUpdate(ViewMetadata view, boolean 
announceLocally) throws ConfigurationException
 +    {
 +        view.metadata.validate();
 +
 +        ViewMetadata oldView = Schema.instance.getView(view.keyspace, 
view.name);
 +        if (oldView == null)
 +            throw new ConfigurationException(String.format("Cannot update non 
existing materialized view '%s' in keyspace '%s'.", view.name, view.keyspace));
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(view.keyspace);
 +
 +        oldView.metadata.validateCompatibility(view.metadata);
 +
 +        logger.info("Update view '{}/{}' From {} To {}", view.keyspace, 
view.name, oldView, view);
 +        announce(SchemaKeyspace.makeUpdateViewMutation(ksm, oldView, view, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceTypeUpdate(UserType updatedType, boolean 
announceLocally)
 +    {
 +        logger.info("Update type '{}.{}' to {}", updatedType.keyspace, 
updatedType.getNameAsString(), updatedType);
 +        announceNewType(updatedType, announceLocally);
 +    }
 +
 +    public static void announceKeyspaceDrop(String ksName) throws 
ConfigurationException
 +    {
 +        announceKeyspaceDrop(ksName, false);
 +    }
 +
 +    public static void announceKeyspaceDrop(String ksName, boolean 
announceLocally) throws ConfigurationException
 +    {
 +        KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksName);
 +        if (oldKsm == null)
 +            throw new ConfigurationException(String.format("Cannot drop non 
existing keyspace '%s'.", ksName));
 +
 +        logger.info("Drop Keyspace '{}'", oldKsm.name);
 +        announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceTableDrop(String ksName, String cfName) throws 
ConfigurationException
 +    {
 +        announceTableDrop(ksName, cfName, false);
 +    }
 +
 +    public static void announceTableDrop(String ksName, String cfName, 
boolean announceLocally) throws ConfigurationException
 +    {
 +        TableMetadata tm = Schema.instance.getTableMetadata(ksName, cfName);
 +        if (tm == null)
 +            throw new ConfigurationException(String.format("Cannot drop non 
existing table '%s' in keyspace '%s'.", cfName, ksName));
 +        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
 +
 +        logger.info("Drop table '{}/{}'", tm.keyspace, tm.name);
 +        announce(SchemaKeyspace.makeDropTableMutation(ksm, tm, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceViewDrop(String ksName, String viewName, 
boolean announceLocally) throws ConfigurationException
 +    {
 +        ViewMetadata view = Schema.instance.getView(ksName, viewName);
 +        if (view == null)
 +            throw new ConfigurationException(String.format("Cannot drop non 
existing materialized view '%s' in keyspace '%s'.", viewName, ksName));
 +        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
 +
 +        logger.info("Drop table '{}/{}'", view.keyspace, view.name);
 +        announce(SchemaKeyspace.makeDropViewMutation(ksm, view, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceTypeDrop(UserType droppedType, boolean 
announceLocally)
 +    {
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(droppedType.keyspace);
 +        announce(SchemaKeyspace.dropTypeFromSchemaMutation(ksm, droppedType, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceFunctionDrop(UDFunction udf, boolean 
announceLocally)
 +    {
 +        logger.info("Drop scalar function overload '{}' args '{}'", 
udf.name(), udf.argTypes());
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
 +        announce(SchemaKeyspace.makeDropFunctionMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    public static void announceAggregateDrop(UDAggregate udf, boolean 
announceLocally)
 +    {
 +        logger.info("Drop aggregate function overload '{}' args '{}'", 
udf.name(), udf.argTypes());
 +        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
 +        announce(SchemaKeyspace.makeDropAggregateMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    /**
 +     * actively announce a new version to active hosts via rpc
 +     * @param schema The schema mutation to be applied
 +     */
 +    private static void announce(Mutation.SimpleBuilder schema, boolean 
announceLocally)
 +    {
 +        List<Mutation> mutations = Collections.singletonList(schema.build());
 +
 +        if (announceLocally)
 +            Schema.instance.merge(mutations);
 +        else
 +            FBUtilities.waitOnFuture(announce(mutations));
 +    }
 +
 +    private static void pushSchemaMutation(InetAddress endpoint, 
Collection<Mutation> schema)
 +    {
 +        MessageOut<Collection<Mutation>> msg = new 
MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
 +                                                                schema,
 +                                                                
MigrationsSerializer.instance);
 +        MessagingService.instance().sendOneWay(msg, endpoint);
 +    }
 +
 +    // Returns a future on the local application of the schema
 +    private static Future<?> announce(final Collection<Mutation> schema)
 +    {
 +        Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new 
WrappedRunnable()
 +        {
 +            protected void runMayThrow() throws ConfigurationException
 +            {
 +                Schema.instance.mergeAndAnnounceVersion(schema);
 +            }
 +        });
 +
 +        for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
 +        {
 +            // only push schema to nodes with known and equal versions
 +            if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
 +                    MessagingService.instance().knowsVersion(endpoint) &&
 +                    MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version)
 +                pushSchemaMutation(endpoint, schema);
 +        }
 +
 +        return f;
 +    }
 +
 +    /**
 +     * Announce my version passively over gossip.
 +     * Used to notify nodes as they arrive in the cluster.
 +     *
 +     * @param version The schema version to announce
 +     */
 +    static void passiveAnnounce(UUID version)
 +    {
 +        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.schema(version));
 +        logger.debug("Gossiping my schema version {}", version);
 +    }
 +
 +    /**
 +     * Clear all locally stored schema information and reset schema to 
initial state.
 +     * Called by user (via JMX) who wants to get rid of schema disagreement.
 +     */
 +    public static void resetLocalSchema()
 +    {
 +        logger.info("Starting local schema reset...");
 +
 +        logger.debug("Truncating schema tables...");
 +
 +        SchemaKeyspace.truncate();
 +
 +        logger.debug("Clearing local schema keyspace definitions...");
 +
 +        Schema.instance.clear();
 +
 +        Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
 +        liveEndpoints.remove(FBUtilities.getBroadcastAddress());
 +
 +        // force migration if there are nodes around
 +        for (InetAddress node : liveEndpoints)
 +        {
 +            if (shouldPullSchemaFrom(node))
 +            {
 +                logger.debug("Requesting schema from {}", node);
 +                FBUtilities.waitOnFuture(submitMigrationTask(node));
 +                break;
 +            }
 +        }
 +
 +        logger.info("Local schema reset is complete.");
 +    }
 +
 +    public static class MigrationsSerializer implements 
IVersionedSerializer<Collection<Mutation>>
 +    {
 +        public static MigrationsSerializer instance = new 
MigrationsSerializer();
 +
 +        public void serialize(Collection<Mutation> schema, DataOutputPlus 
out, int version) throws IOException
 +        {
 +            out.writeInt(schema.size());
 +            for (Mutation mutation : schema)
 +                Mutation.serializer.serialize(mutation, out, version);
 +        }
 +
 +        public Collection<Mutation> deserialize(DataInputPlus in, int 
version) throws IOException
 +        {
 +            int count = in.readInt();
 +            Collection<Mutation> schema = new ArrayList<>(count);
 +
 +            for (int i = 0; i < count; i++)
 +                schema.add(Mutation.serializer.deserialize(in, version));
 +
 +            return schema;
 +        }
 +
 +        public long serializedSize(Collection<Mutation> schema, int version)
 +        {
 +            int size = TypeSizes.sizeof(schema.size());
 +            for (Mutation mutation : schema)
 +                size += Mutation.serializer.serializedSize(mutation, version);
 +            return size;
 +        }
 +    }
 +}

Reply via email to