http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExImpl.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExImpl.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExImpl.java new file mode 100644 index 0000000..922c1ab --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExImpl.java @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.Date; +import java.sql.NClob; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLType; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Calendar; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * PreparedStatement with extended capability to store additional meta information. + */ +@SuppressWarnings("unchecked") +final class PreparedStatementExImpl implements PreparedStatementEx { + /** */ + private final PreparedStatement delegate; + + /** */ + private Object[] meta = null; + + /** + * @param delegate Wrapped statement. + */ + public PreparedStatementExImpl(PreparedStatement delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public ResultSet executeQuery() throws SQLException { + return delegate.executeQuery(); + } + + /** {@inheritDoc} */ + @Override public int executeUpdate() throws SQLException { + return delegate.executeUpdate(); + } + + /** {@inheritDoc} */ + @Override public void setNull(int parameterIndex, int sqlType) throws SQLException { + delegate.setNull(parameterIndex, sqlType); + } + + /** {@inheritDoc} */ + @Override public void setBoolean(int parameterIndex, boolean x) throws SQLException { + delegate.setBoolean(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setByte(int parameterIndex, byte x) throws SQLException { + delegate.setByte(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setShort(int parameterIndex, short x) throws SQLException { + delegate.setShort(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setInt(int parameterIndex, int x) throws SQLException { + delegate.setInt(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setLong(int parameterIndex, long x) throws SQLException { + delegate.setLong(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setFloat(int parameterIndex, float x) throws SQLException { + delegate.setFloat(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setDouble(int parameterIndex, double x) throws SQLException { + delegate.setDouble(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException { + delegate.setBigDecimal(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setString(int parameterIndex, String x) throws SQLException { + delegate.setString(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setBytes(int parameterIndex, byte[] x) throws SQLException { + delegate.setBytes(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setDate(int parameterIndex, Date x) throws SQLException { + delegate.setDate(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setTime(int parameterIndex, Time x) throws SQLException { + delegate.setTime(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { + delegate.setTimestamp(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException { + delegate.setAsciiStream(parameterIndex, x, length); + } + + /** {@inheritDoc} */ + @Deprecated + @Override public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException { + delegate.setUnicodeStream(parameterIndex, x, length); + } + + /** {@inheritDoc} */ + @Override public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException { + delegate.setBinaryStream(parameterIndex, x, length); + } + + /** {@inheritDoc} */ + @Override public void clearParameters() throws SQLException { + delegate.clearParameters(); + } + + /** {@inheritDoc} */ + @Override public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException { + delegate.setObject(parameterIndex, x, targetSqlType); + } + + /** {@inheritDoc} */ + @Override public void setObject(int parameterIndex, Object x) throws SQLException { + delegate.setObject(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public boolean execute() throws SQLException { + return delegate.execute(); + } + + /** {@inheritDoc} */ + @Override public void addBatch() throws SQLException { + delegate.addBatch(); + } + + /** {@inheritDoc} */ + @Override public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException { + delegate.setCharacterStream(parameterIndex, reader, length); + } + + /** {@inheritDoc} */ + @Override public void setRef(int parameterIndex, Ref x) throws SQLException { + delegate.setRef(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setBlob(int parameterIndex, Blob x) throws SQLException { + delegate.setBlob(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setClob(int parameterIndex, Clob x) throws SQLException { + delegate.setClob(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setArray(int parameterIndex, Array x) throws SQLException { + delegate.setArray(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public ResultSetMetaData getMetaData() throws SQLException { + return delegate.getMetaData(); + } + + /** {@inheritDoc} */ + @Override public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { + delegate.setDate(parameterIndex, x, cal); + } + + /** {@inheritDoc} */ + @Override public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { + delegate.setTime(parameterIndex, x, cal); + } + + /** {@inheritDoc} */ + @Override public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { + delegate.setTimestamp(parameterIndex, x, cal); + } + + /** {@inheritDoc} */ + @Override public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { + delegate.setNull(parameterIndex, sqlType, typeName); + } + + /** {@inheritDoc} */ + @Override public void setURL(int parameterIndex, URL x) throws SQLException { + delegate.setURL(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public ParameterMetaData getParameterMetaData() throws SQLException { + return delegate.getParameterMetaData(); + } + + /** {@inheritDoc} */ + @Override public void setRowId(int parameterIndex, RowId x) throws SQLException { + delegate.setRowId(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setNString(int parameterIndex, String value) throws SQLException { + delegate.setNString(parameterIndex, value); + } + + /** {@inheritDoc} */ + @Override public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException { + delegate.setNCharacterStream(parameterIndex, value, length); + } + + /** {@inheritDoc} */ + @Override public void setNClob(int parameterIndex, NClob value) throws SQLException { + delegate.setNClob(parameterIndex, value); + } + + /** {@inheritDoc} */ + @Override public void setClob(int parameterIndex, Reader reader, long length) throws SQLException { + delegate.setClob(parameterIndex, reader, length); + } + + /** {@inheritDoc} */ + @Override public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException { + delegate.setBlob(parameterIndex, inputStream, length); + } + + /** {@inheritDoc} */ + @Override public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException { + delegate.setNClob(parameterIndex, reader, length); + } + + /** {@inheritDoc} */ + @Override public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException { + delegate.setSQLXML(parameterIndex, xmlObject); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException { + delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength); + } + + /** {@inheritDoc} */ + @Override public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException { + delegate.setAsciiStream(parameterIndex, x, length); + } + + /** {@inheritDoc} */ + @Override public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException { + delegate.setBinaryStream(parameterIndex, x, length); + } + + /** {@inheritDoc} */ + @Override public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException { + delegate.setCharacterStream(parameterIndex, reader, length); + } + + /** {@inheritDoc} */ + @Override public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { + delegate.setAsciiStream(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { + delegate.setBinaryStream(parameterIndex, x); + } + + /** {@inheritDoc} */ + @Override public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException { + delegate.setCharacterStream(parameterIndex, reader); + } + + /** {@inheritDoc} */ + @Override public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException { + delegate.setNCharacterStream(parameterIndex, value); + } + + /** {@inheritDoc} */ + @Override public void setClob(int parameterIndex, Reader reader) throws SQLException { + delegate.setClob(parameterIndex, reader); + } + + /** {@inheritDoc} */ + @Override public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException { + delegate.setBlob(parameterIndex, inputStream); + } + + /** {@inheritDoc} */ + @Override public void setNClob(int parameterIndex, Reader reader) throws SQLException { + delegate.setNClob(parameterIndex, reader); + } + + /** {@inheritDoc} */ + @Override public void setObject(int parameterIndex, Object x, SQLType targetSqlType, int scaleOrLength) throws SQLException { + delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength); + } + + /** {@inheritDoc} */ + @Override public void setObject(int parameterIndex, Object x, SQLType targetSqlType) throws SQLException { + delegate.setObject(parameterIndex, x, targetSqlType); + } + + /** {@inheritDoc} */ + @Override public long executeLargeUpdate() throws SQLException { + return delegate.executeLargeUpdate(); + } + + /** {@inheritDoc} */ + @Override public ResultSet executeQuery(String sql) throws SQLException { + return delegate.executeQuery(sql); + } + + /** {@inheritDoc} */ + @Override public int executeUpdate(String sql) throws SQLException { + return delegate.executeUpdate(sql); + } + + /** {@inheritDoc} */ + @Override public void close() throws SQLException { + delegate.close(); + } + + /** {@inheritDoc} */ + @Override public int getMaxFieldSize() throws SQLException { + return delegate.getMaxFieldSize(); + } + + /** {@inheritDoc} */ + @Override public void setMaxFieldSize(int max) throws SQLException { + delegate.setMaxFieldSize(max); + } + + /** {@inheritDoc} */ + @Override public int getMaxRows() throws SQLException { + return delegate.getMaxRows(); + } + + /** {@inheritDoc} */ + @Override public void setMaxRows(int max) throws SQLException { + delegate.setMaxRows(max); + } + + /** {@inheritDoc} */ + @Override public void setEscapeProcessing(boolean enable) throws SQLException { + delegate.setEscapeProcessing(enable); + } + + /** {@inheritDoc} */ + @Override public int getQueryTimeout() throws SQLException { + return delegate.getQueryTimeout(); + } + + /** {@inheritDoc} */ + @Override public void setQueryTimeout(int seconds) throws SQLException { + delegate.setQueryTimeout(seconds); + } + + /** {@inheritDoc} */ + @Override public void cancel() throws SQLException { + delegate.cancel(); + } + + /** {@inheritDoc} */ + @Override public SQLWarning getWarnings() throws SQLException { + return delegate.getWarnings(); + } + + /** {@inheritDoc} */ + @Override public void clearWarnings() throws SQLException { + delegate.clearWarnings(); + } + + /** {@inheritDoc} */ + @Override public void setCursorName(String name) throws SQLException { + delegate.setCursorName(name); + } + + /** {@inheritDoc} */ + @Override public boolean execute(String sql) throws SQLException { + return delegate.execute(sql); + } + + /** {@inheritDoc} */ + @Override public ResultSet getResultSet() throws SQLException { + return delegate.getResultSet(); + } + + /** {@inheritDoc} */ + @Override public int getUpdateCount() throws SQLException { + return delegate.getUpdateCount(); + } + + /** {@inheritDoc} */ + @Override public boolean getMoreResults() throws SQLException { + return delegate.getMoreResults(); + } + + /** {@inheritDoc} */ + @Override public int getFetchDirection() throws SQLException { + return delegate.getFetchDirection(); + } + + /** {@inheritDoc} */ + @Override public void setFetchDirection(int direction) throws SQLException { + delegate.setFetchDirection(direction); + } + + /** {@inheritDoc} */ + @Override public int getFetchSize() throws SQLException { + return delegate.getFetchSize(); + } + + /** {@inheritDoc} */ + @Override public void setFetchSize(int rows) throws SQLException { + delegate.setFetchSize(rows); + } + + /** {@inheritDoc} */ + @Override public int getResultSetConcurrency() throws SQLException { + return delegate.getResultSetConcurrency(); + } + + /** {@inheritDoc} */ + @Override public int getResultSetType() throws SQLException { + return delegate.getResultSetType(); + } + + /** {@inheritDoc} */ + @Override public void addBatch(String sql) throws SQLException { + delegate.addBatch(sql); + } + + /** {@inheritDoc} */ + @Override public void clearBatch() throws SQLException { + delegate.clearBatch(); + } + + /** {@inheritDoc} */ + @Override public int[] executeBatch() throws SQLException { + return delegate.executeBatch(); + } + + /** {@inheritDoc} */ + @Override public Connection getConnection() throws SQLException { + return delegate.getConnection(); + } + + /** {@inheritDoc} */ + @Override public boolean getMoreResults(int current) throws SQLException { + return delegate.getMoreResults(current); + } + + /** {@inheritDoc} */ + @Override public ResultSet getGeneratedKeys() throws SQLException { + return delegate.getGeneratedKeys(); + } + + /** {@inheritDoc} */ + @Override public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + return delegate.executeUpdate(sql, autoGeneratedKeys); + } + + /** {@inheritDoc} */ + @Override public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + return delegate.executeUpdate(sql, columnIndexes); + } + + /** {@inheritDoc} */ + @Override public int executeUpdate(String sql, String[] columnNames) throws SQLException { + return delegate.executeUpdate(sql, columnNames); + } + + /** {@inheritDoc} */ + @Override public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + return delegate.execute(sql, autoGeneratedKeys); + } + + /** {@inheritDoc} */ + @Override public boolean execute(String sql, int[] columnIndexes) throws SQLException { + return delegate.execute(sql, columnIndexes); + } + + /** {@inheritDoc} */ + @Override public boolean execute(String sql, String[] columnNames) throws SQLException { + return delegate.execute(sql, columnNames); + } + + /** {@inheritDoc} */ + @Override public int getResultSetHoldability() throws SQLException { + return delegate.getResultSetHoldability(); + } + + /** {@inheritDoc} */ + @Override public boolean isClosed() throws SQLException { + return delegate.isClosed(); + } + + /** {@inheritDoc} */ + @Override public boolean isPoolable() throws SQLException { + return delegate.isPoolable(); + } + + /** {@inheritDoc} */ + @Override public void setPoolable(boolean poolable) throws SQLException { + delegate.setPoolable(poolable); + } + + /** {@inheritDoc} */ + @Override public void closeOnCompletion() throws SQLException { + delegate.closeOnCompletion(); + } + + /** {@inheritDoc} */ + @Override public boolean isCloseOnCompletion() throws SQLException { + return delegate.isCloseOnCompletion(); + } + + /** {@inheritDoc} */ + @Override public long getLargeUpdateCount() throws SQLException { + return delegate.getLargeUpdateCount(); + } + + /** {@inheritDoc} */ + @Override public long getLargeMaxRows() throws SQLException { + return delegate.getLargeMaxRows(); + } + + /** {@inheritDoc} */ + @Override public void setLargeMaxRows(long max) throws SQLException { + delegate.setLargeMaxRows(max); + } + + /** {@inheritDoc} */ + @Override public long[] executeLargeBatch() throws SQLException { + return delegate.executeLargeBatch(); + } + + /** {@inheritDoc} */ + @Override public long executeLargeUpdate(String sql) throws SQLException { + return delegate.executeLargeUpdate(sql); + } + + /** {@inheritDoc} */ + @Override public long executeLargeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + return delegate.executeLargeUpdate(sql, autoGeneratedKeys); + } + + /** {@inheritDoc} */ + @Override public long executeLargeUpdate(String sql, int[] columnIndexes) throws SQLException { + return delegate.executeLargeUpdate(sql, columnIndexes); + } + + /** {@inheritDoc} */ + @Override public long executeLargeUpdate(String sql, String[] columnNames) throws SQLException { + return delegate.executeLargeUpdate(sql, columnNames); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T unwrap(Class<T> iface) throws SQLException { + if (iface == PreparedStatementExImpl.class || iface == PreparedStatementEx.class) + return (T)this; + + return delegate.unwrap(iface); + } + + /** {@inheritDoc} */ + @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { + return iface == PreparedStatementExImpl.class + || iface == PreparedStatementEx.class + || delegate.isWrapperFor(iface); + } + + /** {@inheritDoc} */ + @Override public @Nullable <T> T meta(int id) { + return meta != null && id < meta.length ? (T)meta[id] : null; + } + + /** {@inheritDoc} */ + @Override public void putMeta(int id, Object metaObj) { + if (meta == null) + meta = new Object[id + 1]; + else if (meta.length <= id) + meta = Arrays.copyOf(meta, id + 1); + + meta[id] = metaObj; + } + + /** + * + * @param stmt Prepared statement to wrap. + * @return Wrapped statement. + */ + public static PreparedStatement wrap(@NotNull PreparedStatement stmt) { + if (stmt.getClass() == PreparedStatementExImpl.class) + return stmt; + + return new PreparedStatementExImpl(stmt); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/RebuildIndexFromHashClosure.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/RebuildIndexFromHashClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/RebuildIndexFromHashClosure.java new file mode 100644 index 0000000..b635eac --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/RebuildIndexFromHashClosure.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; + +/** */ +class RebuildIndexFromHashClosure implements SchemaIndexCacheVisitorClosure { + /** */ + private final GridCacheQueryManager qryMgr; + + /** MVCC status flag. */ + private final boolean mvccEnabled; + + /** + * @param qryMgr Query manager. + * @param mvccEnabled MVCC status flag. + */ + RebuildIndexFromHashClosure(GridCacheQueryManager qryMgr, boolean mvccEnabled) { + this.qryMgr = qryMgr; + this.mvccEnabled = mvccEnabled; + } + + /** {@inheritDoc} */ + @Override public void apply(CacheDataRow row) throws IgniteCheckedException { + // prevRowAvailable is always true with MVCC on, and always false *on index rebuild* with MVCC off. + qryMgr.store(row, null, mvccEnabled); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ResultSetEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ResultSetEnlistFuture.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ResultSetEnlistFuture.java new file mode 100644 index 0000000..ee1c0fa --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ResultSetEnlistFuture.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.NoSuchElementException; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.DhtLockFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.EnlistOperation; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.UpdateSourceIterator; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * Future to process whole local result set of SELECT FOR UPDATE query. + */ +public interface ResultSetEnlistFuture extends DhtLockFuture<Long> { + /** + * @param rs Result set. + * @return Update source. + */ + static UpdateSourceIterator<?> createIterator(ResultSet rs) { + return new ResultSetUpdateSourceIteratorWrapper(rs); + } + + /** */ + void init(); + + /** + * + * @param nearNodeId Near node ID. + * @param nearLockVer Near lock version. + * @param mvccSnapshot Mvcc snapshot. + * @param threadId Thread ID. + * @param nearFutId Near future id. + * @param nearMiniId Near mini future id. + * @param parts Partitions. + * @param tx Transaction. + * @param timeout Lock acquisition timeout. + * @param cctx Cache context. + * @param rs Result set to process. + * @return Result set enlist future. + */ + static ResultSetEnlistFuture future(UUID nearNodeId, GridCacheVersion nearLockVer, + MvccSnapshot mvccSnapshot, long threadId, IgniteUuid nearFutId, int nearMiniId, @Nullable int[] parts, + GridDhtTxLocalAdapter tx, long timeout, GridCacheContext<?, ?> cctx, ResultSet rs) { + + if (tx.near()) + return new NearResultSetEnlistFuture(nearNodeId, nearLockVer, mvccSnapshot, threadId, nearFutId, nearMiniId, parts, tx, timeout, cctx, rs); + else + return new DhtResultSetEnlistFuture(nearNodeId, nearLockVer, mvccSnapshot, threadId, nearFutId, nearMiniId, parts, tx, timeout, cctx, rs); + } + + /** + * + */ + public static class ResultSetUpdateSourceIteratorWrapper implements UpdateSourceIterator<Object> { + /** */ + private static final long serialVersionUID = -8745196216234843471L; + + /** */ + private final ResultSet rs; + + /** */ + private Boolean hasNext; + + /** */ + private int keyColIdx; + + /** + * @param rs Result set. + */ + public ResultSetUpdateSourceIteratorWrapper(ResultSet rs) { + this.rs = rs; + keyColIdx = -1; + } + + /** {@inheritDoc} */ + @Override public EnlistOperation operation() { + return EnlistOperation.LOCK; + } + + /** {@inheritDoc} */ + @Override public boolean hasNextX() { + try { + if (hasNext == null) + hasNext = rs.next(); + + return hasNext; + } + catch (SQLException e) { + throw new IgniteSQLException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object nextX() { + if (!hasNextX()) + throw new NoSuchElementException(); + + try { + if (keyColIdx == -1) + keyColIdx = rs.getMetaData().getColumnCount(); + + return rs.getObject(keyColIdx); + } + catch (SQLException e) { + throw new IgniteSQLException(e); + } + finally { + hasNext = null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java new file mode 100644 index 0000000..25daa23 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPool.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.function.Supplier; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Special pool for managing limited number objects for further reuse. + * This pool maintains separate object bag for each thread by means of {@link ThreadLocal}. + * <p> + * If object is borrowed on one thread and recycled on different then it will be returned to + * recycling thread bag. For thread-safe use either pooled objects should be thread-safe or + * <i>happens-before</i> should be established between borrowing object and subsequent recycling. + * + * @param <E> pooled objects type + */ +public final class ThreadLocalObjectPool<E extends AutoCloseable> { + /** + * Wrapper for a pooled object with capability to return the object to a pool. + * + * @param <T> enclosed object type + */ + public static class Reusable<T extends AutoCloseable> { + /** */ + private final ThreadLocalObjectPool<T> pool; + /** */ + private final T object; + + /** */ + private Reusable(ThreadLocalObjectPool<T> pool, T object) { + this.pool = pool; + this.object = object; + } + + /** + * @return enclosed object + */ + public T object() { + return object; + } + + /** + * Returns an object to a pool or closes it if the pool is already full. + */ + public void recycle() { + Queue<Reusable<T>> bag = pool.bag.get(); + if (bag.size() < pool.poolSize) + bag.add(this); + else + U.closeQuiet(object); + } + } + + /** */ + private final Supplier<E> objectFactory; + /** */ + private final ThreadLocal<Queue<Reusable<E>>> bag = ThreadLocal.withInitial(LinkedList::new); + /** */ + private final int poolSize; + + /** + * @param objectFactory factory used for new objects creation + * @param poolSize number of objects which pool can contain + */ + public ThreadLocalObjectPool(Supplier<E> objectFactory, int poolSize) { + this.objectFactory = objectFactory; + this.poolSize = poolSize; + } + + /** + * Picks an object from the pool if one is present or creates new one otherwise. + * Returns an object wrapper which could be returned to the pool. + * + * @return reusable object wrapper + */ + public Reusable<E> borrow() { + Reusable<E> pooled = bag.get().poll(); + return pooled != null ? pooled : new Reusable<>(this, objectFactory.get()); + } + + /** Visible for test */ + int bagSize() { + return bag.get().size(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java index 546f5bb..5d877cd 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.database; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -26,9 +27,12 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; @@ -84,31 +88,30 @@ public class H2PkHashIndex extends GridH2IndexBase { /** {@inheritDoc} */ @Override public Cursor find(Session ses, final SearchRow lower, final SearchRow upper) { - IndexingQueryFilter f = threadLocalFilter(); - IndexingQueryCacheFilter p = null; + IndexingQueryCacheFilter filter = null; + MvccSnapshot mvccSnapshot = null; - if (f != null) { - String cacheName = getTable().cacheName(); + GridH2QueryContext qctx = GridH2QueryContext.get(); - p = f.forCache(cacheName); + if (qctx != null) { + IndexingQueryFilter f = qctx.filter(); + filter = f != null ? f.forCache(getTable().cacheName()) : null; + mvccSnapshot = qctx.mvccSnapshot(); } - KeyCacheObject lowerObj = null; - KeyCacheObject upperObj = null; + assert !cctx.mvccEnabled() || mvccSnapshot != null; - if (lower != null) - lowerObj = cctx.toCacheKeyObject(lower.getValue(0).getObject()); - - if (upper != null) - upperObj = cctx.toCacheKeyObject(upper.getValue(0).getObject()); + KeyCacheObject lowerObj = lower != null ? cctx.toCacheKeyObject(lower.getValue(0).getObject()) : null; + KeyCacheObject upperObj = upper != null ? cctx.toCacheKeyObject(upper.getValue(0).getObject()) : null; try { - List<GridCursor<? extends CacheDataRow>> cursors = new ArrayList<>(); + Collection<GridCursor<? extends CacheDataRow>> cursors = new ArrayList<>(); for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) - cursors.add(store.cursor(cctx.cacheId(), lowerObj, upperObj)); + if (filter == null || filter.applyPartition(store.partId())) + cursors.add(store.cursor(cctx.cacheId(), lowerObj, upperObj, null, mvccSnapshot)); - return new H2Cursor(new CompositeGridCursor<>(cursors.iterator()), p); + return new H2Cursor(cursors.iterator()); } catch (IgniteCheckedException e) { throw DbException.convert(e); @@ -124,7 +127,6 @@ public class H2PkHashIndex extends GridH2IndexBase { @SuppressWarnings("StatementWithEmptyBody") @Override public GridH2Row put(GridH2Row row) { // Should not be called directly. Rows are inserted into underlying cache data stores. - assert false; throw DbException.getUnsupportedException("put"); @@ -192,28 +194,29 @@ public class H2PkHashIndex extends GridH2IndexBase { */ private class H2Cursor implements Cursor { /** */ - final GridCursor<? extends CacheDataRow> cursor; + private final GridH2RowDescriptor desc; /** */ - final IndexingQueryCacheFilter filter; + private final Iterator<GridCursor<? extends CacheDataRow>> iter; + + /** */ + private GridCursor<? extends CacheDataRow> curr; /** - * @param cursor Cursor. - * @param filter Filter. + * @param iter Cursors iterator. */ - private H2Cursor(GridCursor<? extends CacheDataRow> cursor, IndexingQueryCacheFilter filter) { - assert cursor != null; + private H2Cursor(Iterator<GridCursor<? extends CacheDataRow>> iter) { + assert iter != null; - this.cursor = cursor; - this.filter = filter; + this.iter = iter; + + desc = tbl.rowDescriptor(); } /** {@inheritDoc} */ @Override public Row get() { try { - CacheDataRow dataRow = cursor.get(); - - return tbl.rowDescriptor().createRow(dataRow); + return desc.createRow(curr.get()); } catch (IgniteCheckedException e) { throw DbException.convert(e); @@ -228,13 +231,13 @@ public class H2PkHashIndex extends GridH2IndexBase { /** {@inheritDoc} */ @Override public boolean next() { try { - while (cursor.next()) { - if (filter == null) - return true; + if (curr != null && curr.next()) + return true; - CacheDataRow dataRow = cursor.get(); + while (iter.hasNext()) { + curr = iter.next(); - if (filter.applyPartition(dataRow.partition())) + if (curr.next()) return true; } @@ -250,45 +253,4 @@ public class H2PkHashIndex extends GridH2IndexBase { throw DbException.getUnsupportedException("previous"); } } - - /** - * - */ - private static class CompositeGridCursor<T> implements GridCursor<T> { - /** */ - private final Iterator<GridCursor<? extends T>> iter; - - /** */ - private GridCursor<? extends T> curr; - - /** - * - */ - public CompositeGridCursor(Iterator<GridCursor<? extends T>> iter) { - this.iter = iter; - - if (iter.hasNext()) - curr = iter.next(); - } - - /** {@inheritDoc} */ - @Override public boolean next() throws IgniteCheckedException { - if (curr.next()) - return true; - - while (iter.hasNext()) { - curr = iter.next(); - - if (curr.next()) - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public T get() throws IgniteCheckedException { - return curr.get(); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java index 40b9b0a..724de7e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java @@ -18,9 +18,10 @@ package org.apache.ignite.internal.processors.query.h2.database; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; +import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; @@ -60,17 +61,23 @@ public class H2RowFactory { rowBuilder.initFromLink(cctx.group(), CacheDataRowAdapter.RowData.FULL); - GridH2Row row; - - try { - row = rowDesc.createRow(rowBuilder); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + GridH2Row row = rowDesc.createRow(rowBuilder); assert row.version() != null; return row; } + + /** + * @param link Link. + * @param mvccCrdVer Mvcc coordinator version. + * @param mvccCntr Mvcc counter. + * @param mvccOpCntr Mvcc operation counter. + * @return Row. + * @throws IgniteCheckedException If failed. + */ + public GridH2Row getMvccRow(long link, long mvccCrdVer, long mvccCntr, int mvccOpCntr) throws IgniteCheckedException { + return rowDesc.createRow(new MvccDataRow(cctx.group(),0, link, + PageIdUtils.partId(PageIdUtils.pageId(link)),null, mvccCrdVer, mvccCntr, mvccOpCntr)); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java index 424969e..ce40df0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java @@ -21,9 +21,9 @@ import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO; @@ -35,8 +35,9 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasLeafIO import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO; import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; import org.h2.result.SearchRow; import org.h2.table.IndexColumn; import org.h2.value.Value; @@ -44,7 +45,7 @@ import org.jetbrains.annotations.Nullable; /** */ -public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { +public abstract class H2Tree extends BPlusTree<GridH2SearchRow, GridH2Row> { /** */ private final H2RowFactory rowStore; @@ -61,6 +62,9 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { private final int[] columnIds; /** */ + private final boolean mvccEnabled; + + /** */ private final Comparator<Value> comp = new Comparator<Value>() { @Override public int compare(Value o1, Value o2) { return compareValues(o1, o2); @@ -82,6 +86,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { * @param metaPageId Meta page ID. * @param initNew Initialize new index. * @param rowCache Row cache. + * @param mvccEnabled Mvcc flag. * @param failureProcessor if the tree is corrupted. * @throws IgniteCheckedException If failed. */ @@ -98,6 +103,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { IndexColumn[] cols, List<InlineIndexHelper> inlineIdxs, int inlineSize, + boolean mvccEnabled, @Nullable H2RowCache rowCache, @Nullable FailureProcessor failureProcessor ) throws IgniteCheckedException { @@ -109,6 +115,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { } this.inlineSize = inlineSize; + this.mvccEnabled = mvccEnabled; assert rowStore != null; @@ -121,9 +128,9 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { for (int i = 0; i < cols.length; i++) columnIds[i] = cols[i].column.getColumnId(); - this.rowCache = rowCache; + setIos(H2ExtrasInnerIO.getVersions(inlineSize, mvccEnabled), H2ExtrasLeafIO.getVersions(inlineSize, mvccEnabled)); - setIos(H2ExtrasInnerIO.getVersions(inlineSize), H2ExtrasLeafIO.getVersions(inlineSize)); + this.rowCache = rowCache; initTree(initNew, inlineSize); } @@ -152,21 +159,34 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { return rowStore.getRow(link); } - /** {@inheritDoc} */ - @Override protected GridH2Row getRow(BPlusIO<SearchRow> io, long pageAddr, int idx, Object filter) - throws IgniteCheckedException { - if (filter != null) { - // Filter out not interesting partitions without deserializing the row. - IndexingQueryCacheFilter filter0 = (IndexingQueryCacheFilter)filter; + /** + * Create row from link. + * + * @param link Link. + * @param mvccOpCntr + * @return Row. + * @throws IgniteCheckedException if failed. + */ + public GridH2Row createRowFromLink(long link, long mvccCrdVer, long mvccCntr, int mvccOpCntr) throws IgniteCheckedException { + if (rowCache != null) { + GridH2Row row = rowCache.get(link); - long link = ((H2RowLinkIO)io).getLink(pageAddr, idx); + if (row == null) { + row = rowStore.getMvccRow(link, mvccCrdVer, mvccCntr, mvccOpCntr); - int part = PageIdUtils.partId(PageIdUtils.pageId(link)); + if (row instanceof GridH2KeyValueRowOnheap) + rowCache.put((GridH2KeyValueRowOnheap)row); + } - if (!filter0.applyPartition(part)) - return null; + return row; } + else + return rowStore.getMvccRow(link, mvccCrdVer, mvccCntr, mvccOpCntr); + } + /** {@inheritDoc} */ + @Override public GridH2Row getRow(BPlusIO<GridH2SearchRow> io, long pageAddr, int idx, Object ignore) + throws IgniteCheckedException { return (GridH2Row)io.getLookupRow(this, pageAddr, idx); } @@ -206,8 +226,8 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { /** {@inheritDoc} */ @SuppressWarnings("ForLoopReplaceableByForEach") - @Override protected int compare(BPlusIO<SearchRow> io, long pageAddr, int idx, - SearchRow row) throws IgniteCheckedException { + @Override protected int compare(BPlusIO<GridH2SearchRow> io, long pageAddr, int idx, + GridH2SearchRow row) throws IgniteCheckedException { if (inlineSize() == 0) return compareRows(getRow(io, pageAddr, idx), row); else { @@ -242,7 +262,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { } if (lastIdxUsed == cols.length) - return 0; + return mvccCompare((H2RowLinkIO)io, pageAddr, idx, row); SearchRow rowData = getRow(io, pageAddr, idx); @@ -254,7 +274,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { if (v2 == null) { // Can't compare further. - return 0; + return mvccCompare((H2RowLinkIO)io, pageAddr, idx, row); } Value v1 = rowData.getValue(idx0); @@ -265,7 +285,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { return InlineIndexHelper.fixSort(c, col.sortType); } - return 0; + return mvccCompare((H2RowLinkIO)io, pageAddr, idx, row); } } @@ -276,7 +296,8 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { * @param r2 Row 2. * @return Compare result: see {@link Comparator#compare(Object, Object)} for values. */ - public int compareRows(SearchRow r1, SearchRow r2) { + public int compareRows(GridH2SearchRow r1, GridH2SearchRow r2) { + assert !mvccEnabled || r2.indexSearchRow() || MvccUtils.mvccVersionIsValid(r2.mvccCoordinatorVersion(), r2.mvccCounter()) : r2; if (r1 == r2) return 0; @@ -288,7 +309,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { if (v1 == null || v2 == null) { // Can't compare further. - return 0; + return mvccCompare(r1, r2); } int c = compareValues(v1, v2); @@ -297,7 +318,47 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { return InlineIndexHelper.fixSort(c, cols[i].sortType); } - return 0; + return mvccCompare(r1, r2); + } + + /** + * @param io IO. + * @param pageAddr Page address. + * @param idx Item index. + * @param r2 Search row. + * @return Comparison result. + */ + private int mvccCompare(H2RowLinkIO io, long pageAddr, int idx, GridH2SearchRow r2) { + if (!mvccEnabled || r2.indexSearchRow()) + return 0; + + long crd = io.getMvccCoordinatorVersion(pageAddr, idx); + long cntr = io.getMvccCounter(pageAddr, idx); + int opCntr = io.getMvccOperationCounter(pageAddr, idx); + + assert MvccUtils.mvccVersionIsValid(crd, cntr, opCntr); + + return -MvccUtils.compare(crd, cntr, opCntr, r2); // descending order + } + + /** + * @param r1 First row. + * @param r2 Second row. + * @return Comparison result. + */ + private int mvccCompare(GridH2SearchRow r1, GridH2SearchRow r2) { + if (!mvccEnabled || r2.indexSearchRow()) + return 0; + + long crdVer1 = r1.mvccCoordinatorVersion(); + long crdVer2 = r2.mvccCoordinatorVersion(); + + int c = -Long.compare(crdVer1, crdVer2); + + if (c != 0) + return c; + + return -Long.compare(r1.mvccCounter(), r2.mvccCounter()); } /** @@ -306,4 +367,9 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { * @return Comparison result. */ public abstract int compareValues(Value v1, Value v2); + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(H2Tree.class, this, "super", super.toString()); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeFilterClosure.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeFilterClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeFilterClosure.java new file mode 100644 index 0000000..e583546 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeFilterClosure.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow; +import org.apache.ignite.internal.transactions.IgniteTxMvccVersionCheckedException; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; + +import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.isVisible; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccVersionIsValid; + +/** + * + */ +public class H2TreeFilterClosure implements H2Tree.TreeRowClosure<GridH2SearchRow, GridH2Row> { + /** */ + private final MvccSnapshot mvccSnapshot; + + /** */ + private final IndexingQueryCacheFilter filter; + + /** */ + private final GridCacheContext cctx; + + /** + * @param filter Cache filter. + * @param mvccSnapshot MVCC snapshot. + * @param cctx Cache context. + */ + public H2TreeFilterClosure(IndexingQueryCacheFilter filter, MvccSnapshot mvccSnapshot, GridCacheContext cctx) { + assert (filter != null || mvccSnapshot != null) && cctx != null ; + + this.filter = filter; + this.mvccSnapshot = mvccSnapshot; + this.cctx = cctx; + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree<GridH2SearchRow, GridH2Row> tree, BPlusIO<GridH2SearchRow> io, + long pageAddr, int idx) throws IgniteCheckedException { + return (filter == null || applyFilter((H2RowLinkIO)io, pageAddr, idx)) + && (mvccSnapshot == null || applyMvcc((H2RowLinkIO)io, pageAddr, idx)); + } + + /** + * @param io Row IO. + * @param pageAddr Page address. + * @param idx Item index. + * @return {@code True} if row passes the filter. + */ + private boolean applyFilter(H2RowLinkIO io, long pageAddr, int idx) { + assert filter != null; + + return filter.applyPartition(PageIdUtils.partId(pageId(io.getLink(pageAddr, idx)))); + } + + /** + * @param io Row IO. + * @param pageAddr Page address. + * @param idx Item index. + * @return {@code True} if row passes the filter. + */ + private boolean applyMvcc(H2RowLinkIO io, long pageAddr, int idx) throws IgniteCheckedException { + assert io.storeMvccInfo() : io; + + long rowCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx); + long rowCntr = io.getMvccCounter(pageAddr, idx); + int rowOpCntr = io.getMvccOperationCounter(pageAddr, idx); + + assert mvccVersionIsValid(rowCrdVer, rowCntr, rowOpCntr); + + try { + return isVisible(cctx, mvccSnapshot, rowCrdVer, rowCntr, rowOpCntr, io.getLink(pageAddr, idx)); + } + catch (IgniteTxMvccVersionCheckedException ignored) { + return false; // The row is going to be removed. + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(H2TreeFilterClosure.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index 393ca3b..ab6f42a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -20,36 +20,34 @@ package org.apache.ignite.internal.processors.query.h2.database; import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.NoSuchElementException; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.RootPage; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.h2.H2Cursor; import org.apache.ignite.internal.processors.query.h2.H2RowCache; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; -import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; +import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.h2.engine.Session; import org.h2.index.Cursor; import org.h2.index.IndexType; import org.h2.index.SingleRowCursor; import org.h2.message.DbException; -import org.h2.result.Row; import org.h2.result.SearchRow; import org.h2.result.SortOrder; import org.h2.table.Column; @@ -139,6 +137,7 @@ public class H2TreeIndex extends GridH2IndexBase { cols, inlineIdxs, computeInlineSize(inlineIdxs, inlineSize), + cctx.mvccEnabled(), rowCache, cctx.kernalContext().failure()) { @Override public int compareValues(Value v1, Value v2) { @@ -191,21 +190,22 @@ public class H2TreeIndex extends GridH2IndexBase { /** {@inheritDoc} */ @Override public Cursor find(Session ses, SearchRow lower, SearchRow upper) { try { - IndexingQueryCacheFilter filter = partitionFilter(threadLocalFilter()); + assert lower == null || lower instanceof GridH2SearchRow : lower; + assert upper == null || upper instanceof GridH2SearchRow : upper; int seg = threadLocalSegment(); H2Tree tree = treeForRead(seg); - if (indexType.isPrimaryKey() && lower != null && upper != null && tree.compareRows(lower, upper) == 0) { - GridH2Row row = tree.findOne(lower, filter); + if (!cctx.mvccEnabled() && indexType.isPrimaryKey() && lower != null && upper != null && + tree.compareRows((GridH2SearchRow)lower, (GridH2SearchRow)upper) == 0) { + GridH2Row row = tree.findOne((GridH2SearchRow)lower, filter(GridH2QueryContext.get()), null); - return (row == null) ? EMPTY_CURSOR : new SingleRowCursor(row); + return (row == null) ? GridH2Cursor.EMPTY : new SingleRowCursor(row); } else { - GridCursor<GridH2Row> cursor = tree.find(lower, upper, filter); - - return new H2Cursor(cursor); + return new H2Cursor(tree.find((GridH2SearchRow)lower, + (GridH2SearchRow)upper, filter(GridH2QueryContext.get()), null)); } } catch (IgniteCheckedException e) { @@ -257,6 +257,8 @@ public class H2TreeIndex extends GridH2IndexBase { /** {@inheritDoc} */ @Override public GridH2Row remove(SearchRow row) { + assert row instanceof GridH2SearchRow : row; + try { InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs); @@ -266,7 +268,7 @@ public class H2TreeIndex extends GridH2IndexBase { assert cctx.shared().database().checkpointLockIsHeldByThread(); - return tree.remove(row); + return tree.remove((GridH2SearchRow)row); } catch (IgniteCheckedException e) { throw DbException.convert(e); @@ -279,6 +281,8 @@ public class H2TreeIndex extends GridH2IndexBase { /** {@inheritDoc} */ @Override public boolean removex(SearchRow row) { try { + assert row instanceof GridH2SearchRow : row; + InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs); int seg = segmentForRow(row); @@ -287,7 +291,7 @@ public class H2TreeIndex extends GridH2IndexBase { assert cctx.shared().database().checkpointLockIsHeldByThread(); - return tree.removex(row); + return tree.removex((GridH2SearchRow)row); } catch (IgniteCheckedException e) { throw DbException.convert(e); @@ -315,9 +319,9 @@ public class H2TreeIndex extends GridH2IndexBase { H2Tree tree = treeForRead(seg); - BPlusTree.TreeRowClosure<SearchRow, GridH2Row> filter = filterClosure(); + GridH2QueryContext qctx = GridH2QueryContext.get(); - return tree.size(filter); + return tree.size(filter(qctx)); } catch (IgniteCheckedException e) { throw DbException.convert(e); @@ -337,13 +341,10 @@ public class H2TreeIndex extends GridH2IndexBase { /** {@inheritDoc} */ @Override public Cursor findFirstOrLast(Session session, boolean b) { try { - int seg = threadLocalSegment(); + H2Tree tree = treeForRead(threadLocalSegment()); + GridH2QueryContext qctx = GridH2QueryContext.get(); - H2Tree tree = treeForRead(seg); - - GridH2Row row = b ? tree.findFirst(): tree.findLast(); - - return new SingleRowCursor(row); + return new SingleRowCursor(b ? tree.findFirst(filter(qctx)): tree.findLast(filter(qctx))); } catch (IgniteCheckedException e) { throw DbException.convert(e); @@ -382,16 +383,13 @@ public class H2TreeIndex extends GridH2IndexBase { @Override protected H2Cursor doFind0( IgniteTree t, @Nullable SearchRow first, - boolean includeFirst, @Nullable SearchRow last, - IndexingQueryFilter filter) { + BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) { try { - IndexingQueryCacheFilter pf = partitionFilter(filter); - - GridCursor<GridH2Row> range = t.find(first, last, pf); + GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, filter, null); if (range == null) - range = GridH2IndexBase.EMPTY_CURSOR; + range = EMPTY_CURSOR; return new H2Cursor(range); } @@ -400,6 +398,26 @@ public class H2TreeIndex extends GridH2IndexBase { } } + /** {@inheritDoc} */ + @Override protected BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter(GridH2QueryContext qctx) { + if (qctx == null) { + assert !cctx.mvccEnabled(); + + return null; + } + + IndexingQueryFilter f = qctx.filter(); + IndexingQueryCacheFilter p = f == null ? null : f.forCache(getTable().cacheName()); + MvccSnapshot v = qctx.mvccSnapshot(); + + assert !cctx.mvccEnabled() || v != null; + + if(p == null && v == null) + return null; + + return new H2TreeFilterClosure(p, v, cctx); + } + /** * @param inlineIdxs Inline index helpers. * @param cfgInlineSize Inline size from cache config. @@ -457,63 +475,6 @@ public class H2TreeIndex extends GridH2IndexBase { cctx.offheap().dropRootPageForIndex(cctx.cacheId(), name + "%" + segIdx); } - /** - * Returns a filter which returns true for entries belonging to a particular partition. - * - * @param qryFilter Factory that creates a predicate for filtering entries for a particular cache. - * @return The filter or null if the filter is not needed (e.g., if the cache is not partitioned). - */ - @Nullable private IndexingQueryCacheFilter partitionFilter(@Nullable IndexingQueryFilter qryFilter) { - if (qryFilter == null) - return null; - - String cacheName = getTable().cacheName(); - - return qryFilter.forCache(cacheName); - } - - /** - * An adapter from {@link IndexingQueryCacheFilter} to {@link BPlusTree.TreeRowClosure} which - * filters entries that belong to the current partition. - */ - private static class PartitionFilterTreeRowClosure implements BPlusTree.TreeRowClosure<SearchRow, GridH2Row> { - /** Filter. */ - private final IndexingQueryCacheFilter filter; - - /** - * Creates a {@link BPlusTree.TreeRowClosure} adapter based on the given partition filter. - * - * @param filter The partition filter. - */ - public PartitionFilterTreeRowClosure(IndexingQueryCacheFilter filter) { - this.filter = filter; - } - - /** {@inheritDoc} */ - @Override public boolean apply(BPlusTree<SearchRow, GridH2Row> tree, - BPlusIO<SearchRow> io, long pageAddr, int idx) throws IgniteCheckedException { - - H2RowLinkIO h2io = (H2RowLinkIO)io; - - return filter.applyPartition( - PageIdUtils.partId( - PageIdUtils.pageId( - h2io.getLink(pageAddr, idx)))); - } - } - - /** - * Returns a filter to apply to rows in the current index to obtain only the - * ones owned by the this cache. - * - * @return The filter, which returns true for rows owned by this cache. - */ - @Nullable private BPlusTree.TreeRowClosure<SearchRow, GridH2Row> filterClosure() { - final IndexingQueryCacheFilter filter = partitionFilter(threadLocalFilter()); - - return filter != null ? new PartitionFilterTreeRowClosure(filter) : null; - } - /** {@inheritDoc} */ @Override public void refreshColumnIds() { super.refreshColumnIds(); @@ -528,29 +489,4 @@ public class H2TreeIndex extends GridH2IndexBase { for (int pos = 0; pos < inlineHelpers.size(); ++pos) inlineIdxs.set(pos, inlineHelpers.get(pos)); } - - /** - * Empty cursor. - */ - public static final Cursor EMPTY_CURSOR = new Cursor() { - /** {@inheritDoc} */ - @Override public Row get() { - throw DbException.convert(new NoSuchElementException("Empty cursor")); - } - - /** {@inheritDoc} */ - @Override public SearchRow getSearchRow() { - throw DbException.convert(new NoSuchElementException("Empty cursor")); - } - - /** {@inheritDoc} */ - @Override public boolean next() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean previous() { - return false; - } - }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java new file mode 100644 index 0000000..fbca917 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.util.List; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageUtils; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.query.h2.database.H2Tree; +import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelper; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow; + +/** + * Inner page for H2 row references. + */ +public abstract class AbstractH2ExtrasInnerIO extends BPlusInnerIO<GridH2SearchRow> implements H2RowLinkIO { + /** Payload size. */ + protected final int payloadSize; + + /** */ + public static void register() { + register(false); + + register(true); + } + + /** + * @param mvcc Mvcc flag. + */ + private static void register(boolean mvcc) { + short type = mvcc ? PageIO.T_H2_EX_REF_MVCC_INNER_START : PageIO.T_H2_EX_REF_INNER_START; + + for (short payload = 1; payload <= PageIO.MAX_PAYLOAD_SIZE; payload++) { + IOVersions<? extends AbstractH2ExtrasInnerIO> io = + getVersions((short)(type + payload - 1), payload, mvcc); + + PageIO.registerH2ExtraInner(io, mvcc); + } + } + + /** + * @param payload Payload size. + * @param mvccEnabled Mvcc flag. + * @return IOVersions for given payload. + */ + @SuppressWarnings("unchecked") + public static IOVersions<? extends BPlusInnerIO<GridH2SearchRow>> getVersions(int payload, boolean mvccEnabled) { + assert payload >= 0 && payload <= PageIO.MAX_PAYLOAD_SIZE; + + if (payload == 0) + return mvccEnabled ? H2MvccInnerIO.VERSIONS : H2InnerIO.VERSIONS; + else + return (IOVersions<BPlusInnerIO<GridH2SearchRow>>)PageIO.getInnerVersions((short)(payload - 1), mvccEnabled); + } + + /** + * @param type Type. + * @param payload Payload size. + * @param mvcc Mvcc flag. + * @return Instance of IO versions. + */ + private static IOVersions<? extends AbstractH2ExtrasInnerIO> getVersions(short type, short payload, boolean mvcc) { + return new IOVersions<>(mvcc ? new H2MvccExtrasInnerIO(type, 1, payload) : new H2ExtrasInnerIO(type, 1, payload)); + } + + /** + * @param type Page type. + * @param ver Page format version. + * @param itemSize Item size. + * @param payloadSize Payload size. + */ + AbstractH2ExtrasInnerIO(short type, int ver, int itemSize, int payloadSize) { + super(type, ver, true, itemSize + payloadSize); + + this.payloadSize = payloadSize; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public final void storeByOffset(long pageAddr, int off, GridH2SearchRow row) { + GridH2Row row0 = (GridH2Row)row; + + assert row0.link() != 0 : row0; + + List<InlineIndexHelper> inlineIdxs = InlineIndexHelper.getCurrentInlineIndexes(); + + assert inlineIdxs != null : "no inline index helpers"; + + + int fieldOff = 0; + + for (int i = 0; i < inlineIdxs.size(); i++) { + InlineIndexHelper idx = inlineIdxs.get(i); + + int size = idx.put(pageAddr, off + fieldOff, row.getValue(idx.columnIndex()), payloadSize - fieldOff); + + if (size == 0) + break; + + fieldOff += size; + } + + H2IOUtils.storeRow(row0, pageAddr, off + payloadSize, storeMvccInfo()); + } + + /** {@inheritDoc} */ + @Override public final GridH2SearchRow getLookupRow(BPlusTree<GridH2SearchRow, ?> tree, long pageAddr, int idx) + throws IgniteCheckedException { + long link = getLink(pageAddr, idx); + + assert link != 0; + + if (storeMvccInfo()) { + long mvccCrdVer = getMvccCoordinatorVersion(pageAddr, idx); + long mvccCntr = getMvccCounter(pageAddr, idx); + int mvccOpCntr = getMvccOperationCounter(pageAddr, idx); + + return ((H2Tree)tree).createRowFromLink(link, mvccCrdVer, mvccCntr, mvccOpCntr); + } + + return ((H2Tree)tree).createRowFromLink(link); + } + + /** {@inheritDoc} */ + @Override public final void store(long dstPageAddr, int dstIdx, BPlusIO<GridH2SearchRow> srcIo, long srcPageAddr, int srcIdx) { + int srcOff = srcIo.offset(srcIdx); + + byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff, payloadSize); + long link = PageUtils.getLong(srcPageAddr, srcOff + payloadSize); + + assert link != 0; + + int dstOff = offset(dstIdx); + + PageUtils.putBytes(dstPageAddr, dstOff, payload); + + H2IOUtils.store(dstPageAddr, dstOff + payloadSize, srcIo, srcPageAddr, srcIdx, storeMvccInfo()); + } + + /** {@inheritDoc} */ + @Override public final long getLink(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + payloadSize); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java new file mode 100644 index 0000000..9132795 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.util.List; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageUtils; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.query.h2.database.H2Tree; +import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelper; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow; + +/** + * Leaf page for H2 row references. + */ +public abstract class AbstractH2ExtrasLeafIO extends BPlusLeafIO<GridH2SearchRow> implements H2RowLinkIO { + /** Payload size. */ + protected final int payloadSize; + + /** */ + public static void register() { + register(false); + + register(true); + } + + /** + * @param mvcc Mvcc flag. + */ + private static void register(boolean mvcc) { + short type = mvcc ? PageIO.T_H2_EX_REF_MVCC_LEAF_START : PageIO.T_H2_EX_REF_LEAF_START; + + for (short payload = 1; payload <= PageIO.MAX_PAYLOAD_SIZE; payload++) { + IOVersions<? extends AbstractH2ExtrasLeafIO> io = + getVersions((short)(type + payload - 1), payload, mvcc); + + PageIO.registerH2ExtraLeaf(io, mvcc); + } + } + + /** + * @param payload Payload size. + * @param mvccEnabled Mvcc flag. + * @return IOVersions for given payload. + */ + @SuppressWarnings("unchecked") + public static IOVersions<? extends BPlusLeafIO<GridH2SearchRow>> getVersions(int payload, boolean mvccEnabled) { + assert payload >= 0 && payload <= PageIO.MAX_PAYLOAD_SIZE; + + if (payload == 0) + return mvccEnabled ? H2MvccLeafIO.VERSIONS : H2LeafIO.VERSIONS; + else + return (IOVersions<BPlusLeafIO<GridH2SearchRow>>)PageIO.getLeafVersions((short)(payload - 1), mvccEnabled); + } + + /** + * @param type Type. + * @param payload Payload size. + * @param mvcc Mvcc flag. + * @return Versions. + */ + private static IOVersions<? extends AbstractH2ExtrasLeafIO> getVersions(short type, short payload, boolean mvcc) { + return new IOVersions<>(mvcc ? new H2MvccExtrasLeafIO(type, 1, payload) : new H2ExtrasLeafIO(type, 1, payload)); + } + + /** + * @param type Page type. + * @param ver Page format version. + * @param itemSize Item size. + * @param payloadSize Payload size. + */ + AbstractH2ExtrasLeafIO(short type, int ver, int itemSize, int payloadSize) { + super(type, ver, itemSize + payloadSize); + + this.payloadSize = payloadSize; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public final void storeByOffset(long pageAddr, int off, GridH2SearchRow row) { + GridH2Row row0 = (GridH2Row)row; + + assert row0.link() != 0; + + List<InlineIndexHelper> inlineIdxs = InlineIndexHelper.getCurrentInlineIndexes(); + + assert inlineIdxs != null : "no inline index helpers"; + + int fieldOff = 0; + + for (int i = 0; i < inlineIdxs.size(); i++) { + InlineIndexHelper idx = inlineIdxs.get(i); + + int size = idx.put(pageAddr, off + fieldOff, row.getValue(idx.columnIndex()), payloadSize - fieldOff); + + if (size == 0) + break; + + fieldOff += size; + } + + H2IOUtils.storeRow(row0, pageAddr, off + payloadSize, storeMvccInfo()); + } + + /** {@inheritDoc} */ + @Override public final void store(long dstPageAddr, int dstIdx, BPlusIO<GridH2SearchRow> srcIo, long srcPageAddr, int srcIdx) { + int srcOff = srcIo.offset(srcIdx); + + byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff, payloadSize); + long link = PageUtils.getLong(srcPageAddr, srcOff + payloadSize); + + assert link != 0; + + int dstOff = offset(dstIdx); + + PageUtils.putBytes(dstPageAddr, dstOff, payload); + + H2IOUtils.store(dstPageAddr, dstOff + payloadSize, srcIo, srcPageAddr, srcIdx, storeMvccInfo()); + } + + /** {@inheritDoc} */ + @Override public final GridH2SearchRow getLookupRow(BPlusTree<GridH2SearchRow, ?> tree, long pageAddr, int idx) + throws IgniteCheckedException { + long link = getLink(pageAddr, idx); + + if (storeMvccInfo()) { + long mvccCrdVer = getMvccCoordinatorVersion(pageAddr, idx); + long mvccCntr = getMvccCounter(pageAddr, idx); + int mvccOpCntr = getMvccOperationCounter(pageAddr, idx); + + return ((H2Tree)tree).createRowFromLink(link, mvccCrdVer, mvccCntr, mvccOpCntr); + } + + return ((H2Tree)tree).createRowFromLink(link); + } + + /** {@inheritDoc} */ + @Override public final long getLink(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + payloadSize); + } +}