ignite-sql - wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c7be2f7c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c7be2f7c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c7be2f7c Branch: refs/heads/ignite-sql Commit: c7be2f7cf27916ac815aea5797751cd3af1ad45b Parents: 4f5b36b Author: S.Vladykin <[email protected]> Authored: Wed Feb 4 10:16:04 2015 +0300 Committer: S.Vladykin <[email protected]> Committed: Wed Feb 4 10:16:04 2015 +0300 ---------------------------------------------------------------------- .../ignite/cache/query/QuerySqlPredicate.java | 34 +++++++++++--- .../processors/cache/IgniteCacheProxy.java | 11 +++-- .../processors/cache/QueryCursorImpl.java | 19 ++++---- .../processors/query/GridQueryIndexing.java | 4 +- .../processors/query/GridQueryProcessor.java | 20 ++++++++ .../processors/query/h2/IgniteH2Indexing.java | 48 +++++++++++++++++++- 6 files changed, 113 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7be2f7c/modules/core/src/main/java/org/apache/ignite/cache/query/QuerySqlPredicate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/QuerySqlPredicate.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QuerySqlPredicate.java index 921f384..c61dd07 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/QuerySqlPredicate.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QuerySqlPredicate.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; */ public final class QuerySqlPredicate extends QueryPredicate { /** */ - private Class<?> type; + private String type; /** SQL clause. */ private String sql; @@ -48,9 +48,22 @@ public final class QuerySqlPredicate extends QueryPredicate { * @param args Arguments. */ public QuerySqlPredicate(Class<?> type, String sql, Object... args) { - this.type = type; - this.sql = sql; - this.args = args; + setType(type); + setSql(sql); + setArgs(args); + } + + /** + * Constructs SQL predicate with given type, SQL clause and arguments. + * + * @param type Type to query in cache. + * @param sql SQL clause. + * @param args Arguments. + */ + public QuerySqlPredicate(String type, String sql, Object... args) { + setType(type); + setSql(sql); + setArgs(args); } /** @@ -60,7 +73,7 @@ public final class QuerySqlPredicate extends QueryPredicate { * @param args Arguments. */ public QuerySqlPredicate(String sql, Object... args) { - this(null, sql, args); + this((String)null, sql, args); } /** @@ -104,7 +117,7 @@ public final class QuerySqlPredicate extends QueryPredicate { * * @return Type. */ - public Class<?> getType() { + public String getType() { return type; } @@ -113,10 +126,17 @@ public final class QuerySqlPredicate extends QueryPredicate { * * @param type Type. */ - public void setType(Class<?> type) { + public void setType(String type) { this.type = type; } + /** + * @param type Type. + */ + public void setType(Class<?> type) { + this.type = type == null ? null : type.getName(); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(QuerySqlPredicate.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7be2f7c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 914639c..55140e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -242,10 +242,9 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (filter instanceof QuerySqlPredicate) { - // TODO query over entries on indexing - ctx.kernalContext().query().query() + QuerySqlPredicate p = (QuerySqlPredicate)filter; - return null; + return ctx.kernalContext().query().queryTwoStep(ctx.name(), p.getType(), p.getSql(), p.getArgs()); } final CacheQuery<Map.Entry<K,V>> qry; @@ -279,7 +278,11 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach if (!onHasNext()) throw new NoSuchElementException(); - return new CacheEntryImpl<>(cur.getKey(), cur.getValue()); + Map.Entry<K,V> e = cur; + + cur = null; + + return new CacheEntryImpl<>(e.getKey(), e.getValue()); } @Override protected boolean onHasNext() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7be2f7c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java index 0c04c1e..cc1af78 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.query.*; -import org.apache.ignite.internal.util.lang.*; import java.util.*; @@ -28,7 +27,7 @@ import java.util.*; */ public class QueryCursorImpl<T> implements QueryCursor<T> { /** */ - private GridCloseableIterator<T> iter; + private Iterator<T> iter; /** */ private boolean iterTaken; @@ -36,7 +35,7 @@ public class QueryCursorImpl<T> implements QueryCursor<T> { /** * @param iter Iterator. */ - public QueryCursorImpl(GridCloseableIterator<T> iter) { + public QueryCursorImpl(Iterator<T> iter) { this.iter = iter; } @@ -70,16 +69,18 @@ public class QueryCursorImpl<T> implements QueryCursor<T> { /** {@inheritDoc} */ @Override public void close() { - GridCloseableIterator<T> i; + Iterator<T> i; if ((i = iter) != null) { iter = null; - try { - i.close(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); + if (i instanceof AutoCloseable) { + try { + ((AutoCloseable)i).close(); + } + catch (Exception e) { + throw new IgniteException(e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7be2f7c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index d09f1cf..5a5d09a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -77,7 +77,7 @@ public interface GridQueryIndexing { * @param params Parameters. * @return Cursor. */ - public QueryCursor<Cache.Entry<?,?>> queryTwoStep(String space, String type, String sqlQry, Object[] params); + public <K,V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(String space, String type, String sqlQry, Object[] params); /** * Queries individual fields (generally used by JDBC drivers). @@ -89,7 +89,7 @@ public interface GridQueryIndexing { * @return Query result. * @throws IgniteCheckedException If failed. */ - public <K, V> GridQueryFieldsResult queryFields(@Nullable String spaceName, String qry, + public GridQueryFieldsResult queryFields(@Nullable String spaceName, String qry, Collection<Object> params, IndexingQueryFilter filters) throws IgniteCheckedException; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7be2f7c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 6702eda..7c4ec81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -38,6 +38,7 @@ import org.apache.ignite.spi.indexing.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.*; import java.lang.reflect.*; import java.util.*; import java.util.concurrent.*; @@ -474,6 +475,25 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param space Space. + * @param type Type. + * @param sqlQry Query. + * @param params Parameters. + * @return Cursor. + */ + public <K,V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(String space, String type, String sqlQry, Object[] params) { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to execute query (grid is stopping)."); + + try { + return idx.queryTwoStep(space, type, sqlQry, params); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param space Space. * @param key Key. * @throws IgniteCheckedException Thrown in case of any errors. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7be2f7c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 8c856c0..f2190ab 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -55,6 +55,7 @@ import org.h2.value.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.Cache; import java.io.*; import java.lang.reflect.*; import java.math.*; @@ -535,7 +536,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <K, V> GridQueryFieldsResult queryFields(@Nullable final String spaceName, final String qry, + @Override public GridQueryFieldsResult queryFields(@Nullable final String spaceName, final String qry, @Nullable final Collection<Object> params, final IndexingQueryFilter filters) throws IgniteCheckedException { setFilters(filters); @@ -761,6 +762,51 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(String space, String type, String sqlQry, + Object[] params) { + TableDescriptor tblDesc = tableDescriptor(type, space); + + if (tblDesc == null) + return new QueryCursorImpl<>(Collections.<Cache.Entry<K,V>>emptyIterator()); + + String qry; + + try { + qry = generateQuery(sqlQry, tblDesc); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + final QueryCursor<List<?>> res = queryTwoStep(space, qry, params); + + final Iterator<List<?>> iter0 = res.iterator(); + + Iterator<Cache.Entry<K,V>> iter = new Iterator<Cache.Entry<K,V>>() { + @Override public boolean hasNext() { + return iter0.hasNext(); + } + + @Override public Cache.Entry<K,V> next() { + List<?> l = iter0.next(); + + return new CacheEntryImpl<>((K)l.get(0),(V)l.get(1)); + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + + return new QueryCursorImpl<Cache.Entry<K,V>>(iter) { + @Override public void close() { + res.close(); + } + }; + } + + /** {@inheritDoc} */ @Override public QueryCursor<List<?>> queryTwoStep(String space, String sqlQry, Object[] params) { Connection c = connectionForSpace(space);
