Heri Ramampiaro has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/378
Change subject: Added udf tutorial, fixed inconsistency and UTF String bugs
......................................................................
Added udf tutorial, fixed inconsistency and UTF String bugs
Change-Id: Ibdb5074d0e0d4fb2b7d4303aa405c9fc90f4bd09
---
A asterix-doc/src/site/markdown/udf.md
M asterix-doc/src/site/site.xml
M
asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
M
asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
6 files changed, 432 insertions(+), 36 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/78/378/1
diff --git a/asterix-doc/src/site/markdown/udf.md
b/asterix-doc/src/site/markdown/udf.md
new file mode 100644
index 0000000..229c3c3
--- /dev/null
+++ b/asterix-doc/src/site/markdown/udf.md
@@ -0,0 +1,288 @@
+# Support for User Defined Functions in AsterixDB #
+
+## <a id="#toc">Table of Contents</a> ##
+* [Using UDF to preprocess feed-collected data](#PreprocessingCollectedData)
+* [Writing an External UDF](#WritingAnExternalUDF)
+* [Creating an AsterixDB Library](#CreatingAnAsterixDBLibrary)
+* [Installing an AsterixDB Library](#installingUDF)
+
+In this document, we describe the support for implementing, using, and
installing user-defined functions (UDF) in
+AsterixDB. We will explain how we can use UDFs to preprocess, e.g., data
collected using
+feeds (see the [feeds tutorial](feeds/tutorial.html)).
+
+## <a id="PreprocessingCollectedData">Preprocessing Collected Data</a> ###
+
+In the following we assume that you already created the `TwitterFeed` and its
corresponding
+data types and dataset following the instruction explained in the [feeds
tutorial](feeds/tutorial.html).
+
+A feed definition may optionally include the specification of a
+user-defined function that is to be applied to each feed record prior
+to persistence. Examples of pre-processing might include adding
+attributes, filtering out records, sampling, sentiment analysis, feature
+extraction, etc. We can express a UDF, which can be defined in AQL or in a
programming
+language such as Java, to perform such pre-processing. An AQL UDF is a good
fit when
+pre-processing a record requires the result of a query (join or aggregate)
+over data contained in AsterixDB datasets. More sophisticated
+processing such as sentiment analysis of text is better handled
+by providing a Java UDF. A Java UDF has an initialization phase
+that allows the UDF to access any resources it may need to initialize
+itself prior to being used in a data flow. It is assumed by the
+AsterixDB compiler to be stateless and thus usable as an embarrassingly
+parallel black box. In contrast, the AsterixDB compiler can
+reason about an AQL UDF and involve the use of indexes during
+its invocation.
+
+We consider an example transformation of a raw tweet into its
+lightweight version called `ProcessedTweet`, which is defined next.
+
+ use dataverse feeds;
+
+ create type ProcessedTweet if not exists as open {
+ id: string,
+ user_name:string,
+ location:point,
+ created_at:string,
+ message_text:string,
+ country: string,
+ topics: {{string}}
+ };
+
+ create dataset ProcessedTweets(ProcessedTweet)
+ primary key id;
+
+The processing required in transforming a collected tweet to its lighter
version of type `ProcessedTweet` involves extracting the topics or hash-tags
(if any) in a tweet
+and collecting them in the referred "topics" attribute for the tweet.
+Additionally, the latitude and longitude values (doubles) are combined into
the spatial point type. Note that spatial data types are considered as
first-class citizens that come with the support for creating indexes. Next we
show a revised version of our example TwitterFeed that involves the use of a
UDF. We assume that the UDF that contains the transformation logic into a
"ProcessedTweet" is available as a Java UDF inside an AsterixDB library named
'testlib'. We defer the writing of a Java UDF and its installation as part of
an AsterixDB library to a later section of this document.
+
+ use dataverse feeds;
+
+ create feed ProcessedTwitterFeed if not exists
+ using "push_twitter"
+ (("type-name"="Tweet"),
+ ("consumer.key"="************"),
+ ("consumer.secret"="**************"),
+ ("access.token"="**********"),
+ ("access.token.secret"="*************"))
+
+ apply function testlib#addHashTagsInPlace;
+
+Note that a feed adaptor and a UDF act as pluggable components. These
+contribute towards providing a generic "plug-and-play" model where
+custom implementations can be provided to cater to specific requirements.
+
+####Building a Cascade Network of Feeds####
+Multiple high-level applications may wish to consume the data
+ingested from a data feed. Each such application might perceive the
+feed in a different way and require the arriving data to be processed
+and/or persisted differently. Building a separate flow of data from
+the external source for each application is wasteful of resources as
+the pre-processing or transformations required by each application
+might overlap and could be done together in an incremental fashion
+to avoid redundancy. A single flow of data from the external source
+could provide data for multiple applications. To achieve this, we
+introduce the notion of primary and secondary feeds in AsterixDB.
+
+A feed in AsterixDB is considered to be a primary feed if it gets
+its data from an external data source. The records contained in a
+feed (subsequent to any pre-processing) are directed to a designated
+AsterixDB dataset. Alternatively or additionally, these records can
+be used to derive other feeds known as secondary feeds. A secondary
+feed is similar to its parent feed in every other aspect; it can
+have an associated UDF to allow for any subsequent processing,
+can be persisted into a dataset, and/or can be made to derive other
+secondary feeds to form a cascade network. A primary feed and a
+dependent secondary feed form a hierarchy. As an example, we next show an
+example AQL statement that redefines the previous feed
+"ProcessedTwitterFeed" in terms of their
+respective parent feed (TwitterFeed).
+
+ use dataverse feeds;
+
+ drop feed ProcessedTwitterFeed if exists;
+
+ create secondary feed ProcessedTwitterFeed from feed TwitterFeed
+ apply function testlib#addHashTags;
+
+The `addHashTags` function is already provided in the release. Below
+we will explain how this function or other functions can be implemented and
added to the system.
+
+## <a id="WritingAnExternalUDF">Writing an External UDF</a> ###
+
+A Java UDF in AsterixDB is required to implement an interface. We shall next
write a basic UDF that extracts the
+hashtags contained in the tweet's text and appends each into an unordered
list. The list is added as an additional
+attribute to the tweet to form the augment version - ProcessedTweet`.
+
+ package org.apache.asterix.external.library;
+
+ import org.apache.asterix.external.library.java.JObjects.JDouble;
+ import org.apache.asterix.external.library.java.JObjects.JPoint;
+ import org.apache.asterix.external.library.java.JObjects.JRecord;
+ import org.apache.asterix.external.library.java.JObjects.JString;
+ import org.apache.asterix.external.library.java.JObjects.JUnorderedList;
+ import org.apache.asterix.external.library.java.JTypeTag;
+ import org.apache.asterix.external.util.Datatypes;
+
+ public class AddHashTagsFunction implements IExternalScalarFunction {
+
+ private JUnorderedList list = null;
+ private JPoint location = null;
+
+ @Override
+ public void initialize(IFunctionHelper functionHelper) {
+ list = new
JUnorderedList(functionHelper.getObject(JTypeTag.STRING));
+ location = new JPoint(0, 0);
+ }
+
+ @Override
+ public void deinitialize() {
+ }
+
+ @Override
+ public void evaluate(IFunctionHelper functionHelper) throws Exception {
+ list.clear();
+ JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+ JString text = (JString)
inputRecord.getValueByName(Datatypes.Tweet.MESSAGE);
+ JDouble latitude = (JDouble)
inputRecord.getValueByName(Datatypes.Tweet.LATITUDE);
+ JDouble longitude = (JDouble)
inputRecord.getValueByName(Datatypes.Tweet.LONGITUDE);
+
+ if (latitude != null && longitude != null) {
+ location.setValue(latitude.getValue(), longitude.getValue());
+ }
+ String[] tokens = text.getValue().split(" ");
+ for (String tk : tokens) {
+ if (tk.startsWith("#")) {
+ JString newField = (JString)
functionHelper.getObject(JTypeTag.STRING);
+ newField.setValue(tk);
+ list.add(newField);
+ }
+ }
+
+ JRecord outputRecord = (JRecord) functionHelper.getResultObject();
+ outputRecord.setField(Datatypes.Tweet.ID,
inputRecord.getValueByName(Datatypes.Tweet.ID));
+
+ JRecord userRecord = (JRecord)
inputRecord.getValueByName(Datatypes.Tweet.USER);
+ outputRecord.setField(Datatypes.ProcessedTweet.USER_NAME,
+ userRecord.getValueByName(Datatypes.Tweet.SCREEN_NAME));
+
+ outputRecord.setField(Datatypes.ProcessedTweet.LOCATION, location);
+ outputRecord.setField(Datatypes.Tweet.CREATED_AT,
inputRecord.getValueByName(Datatypes.Tweet.CREATED_AT));
+ outputRecord.setField(Datatypes.Tweet.MESSAGE, text);
+ outputRecord.setField(Datatypes.ProcessedTweet.TOPICS, list);
+
+ inputRecord.addField(Datatypes.ProcessedTweet.TOPICS, list);
+ functionHelper.setResult(outputRecord);
+ }
+
+ }
+
+A Java UDF has an associated factory class that is required and is used by
AsterixDB in creating an instance of the function at runtime. Given below is
the corresponding factory class.
+
+ package org.apache.asterix.external.library;
+
+ import org.apache.asterix.external.library.IExternalScalarFunction;
+ import org.apache.asterix.external.library.IFunctionFactory;
+
+ public class AddHashTagsFunctionFactory implements IFunctionFactory {
+
+ @Override
+ public IExternalScalarFunction getExternalFunction() {
+ return new AddHashTagsFunction();
+ }
+ }
+
+At this stage, we shall compile the above two source files. To do so, we would
need the following jars.
+
+ asterix-common-0.8.7-SNAPSHOT.jar
+ asterix-external-data-0.8.7-SNAPSHOT.jar
+
+## <a id="CreatingAnAsterixDBLibrary">Creating an AsterixDB Library</a> ###
+
+We need to install our Java UDF so that we may use it in AQL
statements/queries. An AsterixDB library has a pre-defined structure which is
as follows.
+
+
+ - A **jar** file, which contains the class files for your UDF source code.
+
+ - File `descriptor.xml`, which is a descriptor with meta-information about
the library.
+
+ <externalLibrary xmlns="library">
+ <language>JAVA</language>
+ <libraryFunctions>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>addHashTags</name>
+ <arguments>Tweet</arguments>
+ <return_type>ProcessedTweet</return_type>
+
<definition>org.apache.asterix.external.library.AddHashTagsFunctionFactory
+ </definition>
+ </libraryFunction>
+ </libraryFunctions>
+ </externalLibrary>
+
+
+- lib: other dependency jars
+
+If the Java UDF requires additional dependency jars, you may add them under a
"lib" folder is required.
+
+We create a zip bundle that contains the jar file and the library descriptor
xml file. The zip would have the following structure.
+
+ $ unzip -l ./tweetlib.zip
+ Archive: ./tweetlib.zip
+
+ Length Date Time Name
+ -------- ---- ---- ----
+ 760817 04-23-14 17:16 hash-tags.jar
+ 405 04-23-14 17:16 tweet.xml
+ -------- -------
+ 761222 2 files
+
+### <a name="installingUDF">Installing an AsterixDB Library</a>###
+
+We assume you have followed the [installation instructions](../install.html)
to set up a running AsterixDB instance. Let us refer your AsterixDB instance by
the name "my_asterix".
+
+- Step 1: Stop the AsterixDB instance if it is in the ACTIVE state.
+
+ $ managix stop -n my_asterix
+
+
+- Step 2: Install the library using Managix install command. Just to
illustrate, we use the help command to look up the syntax
+
+ $ managix help -cmd install
+ Installs a library to an asterix instance.
+ Options
+ n Name of Asterix Instance
+ d Name of the dataverse under which the library will be installed
+ l Name of the library
+ p Path to library zip bundle
+
+
+Above is a sample output and explains the usage and the required parameters.
Each library has a name and is installed under a dataverse. Recall that we had
created a dataverse by the name - "feeds" prior to creating our datatypes and
dataset. We shall name our library - "testlib".
+
+We assume you have a library zip bundle that needs to be installed.
+To install the library, use the Managix install command. An example is shown
below.
+
+ $ managix install -n my_asterix -d feeds -l testlib -p <put the
absolute path of the library zip bundle here>
+
+You should see the following message:
+
+ INFO: Installed library testlib
+
+We shall next start our AsterixDB instance using the start command as shown
below.
+
+ $ managix start -n my_asterix
+
+You may now use the AsterixDB library in AQL statements and queries. To look
at the installed artifacts, you may execute the following query at the
AsterixDB web-console.
+
+ for $x in dataset Metadata.Function
+ return $x
+
+ for $x in dataset Metadata.Library
+ return $x
+
+Our library is now installed and is ready to be used.
+
+To uninstall a library, use the Managix uninstall command as follows:
+
+ $ managix stop -n my_asterix
+
+ $ managix uninstall -n my_asterix -d feeds -l testlib
+
diff --git a/asterix-doc/src/site/site.xml b/asterix-doc/src/site/site.xml
index 37a0e53..5fb4986 100644
--- a/asterix-doc/src/site/site.xml
+++ b/asterix-doc/src/site/site.xml
@@ -82,6 +82,7 @@
<item name="AQL Support of Similarity Queries"
href="aql/similarity.html"/>
<item name="Accessing External Data" href="aql/externaldata.html"/>
<item name="Support for Data Ingestion in AsterixDB"
href="feeds/tutorial.html" />
+ <item name="Support for User Defined Functions in AsterixDB"
href="udf.html" />
<item name="Filter-Based LSM Index Acceleration"
href="aql/filters.html"/>
<item name="HTTP API to AsterixDB" href="api.html"/>
</menu>
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 6ab12f6..aba0ec9 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -14,11 +14,6 @@
*/
package org.apache.asterix.external.library.java;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.util.LinkedHashMap;
-import java.util.List;
-
import org.apache.asterix.common.exceptions.AsterixException;
import
org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
import
org.apache.asterix.dataflow.data.nontagged.serde.ACircleSerializerDeserializer;
@@ -40,6 +35,7 @@
import
org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import
org.apache.asterix.dataflow.data.nontagged.serde.ATimeSerializerDeserializer;
import org.apache.asterix.external.library.TypeInfo;
+import
org.apache.asterix.external.library.java.JObjects.ByteArrayAccessibleInputStream;
import org.apache.asterix.external.library.java.JObjects.JBoolean;
import org.apache.asterix.external.library.java.JObjects.JByte;
import org.apache.asterix.external.library.java.JObjects.JCircle;
@@ -82,8 +78,19 @@
import org.apache.asterix.om.util.container.IObjectPool;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
public class JObjectAccessors {
+ private static ByteArrayAccessibleOutputStream baaos = new
ByteArrayAccessibleOutputStream();
+ private static ByteArrayAccessibleInputStream baais = new
ByteArrayAccessibleInputStream(baaos.getByteArray(), 0, 0);
+ private static DataInputStream dis = new DataInputStream(baais);
+
public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag
aTypeTag) {
IJObjectAccessor accessor = null;
@@ -224,18 +231,22 @@
@Override
public IJObject access(IVisitablePointable pointable,
IObjectPool<IJObject, IAType> objectPool)
throws HyracksDataException {
- byte[] b = pointable.getByteArray();
- int s = pointable.getStartOffset();
- int l = pointable.getLength();
-
- String v = null;
- v = AStringSerializerDeserializer.INSTANCE.deserialize(
- new DataInputStream(new ByteArrayInputStream(b, s+1,
l-1))).getStringValue();
- //v = new String(b, s+1, l, "UTF-8");
- JObjectUtil.getNormalizedString(v);
-
IJObject jObject = objectPool.allocate(BuiltinType.ASTRING);
- ((JString) jObject).setValue(JObjectUtil.getNormalizedString(v));
+
+ try {
+ byte byteArray[] = pointable.getByteArray();
+ int len = pointable.getLength();
+ int off = pointable.getStartOffset()+1;
+ baaos.reset();
+ if(off >= 0 && off <= byteArray.length && len >= 0 && off +
len - byteArray.length <= 0) {
+ baaos.write(byteArray, off, len);
+ ((JString)
jObject).setValue(JObjectUtil.getNormalizedString(baaos.toString("UTF-8")));
+ } else {
+ ((JString) jObject).setValue("");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
return jObject;
}
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
index 4ee7975..f66f912 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
@@ -16,7 +16,7 @@
public class Datatypes {
- public static final class Tweet {
+ /*public static final class Tweet {
public static final String ID = "id";
public static final String USER = "user";
public static final String MESSAGE = "message_text";
@@ -25,11 +25,69 @@
public static final String CREATED_AT = "created_at";
public static final String SCREEN_NAME = "screen_name";
public static final String COUNTRY = "country";
+ }*/
+
+ /*
+
+ The following assumes this DDL (but ignoring the field name orders):
+
+ create type TwitterUser if not exists as open{
+ screen_name: string,
+ language: string,
+ friends_count: int32,
+ status_count: int32,
+ name: string,
+ followers_count: string
+ };
+
+ create type Tweet if not exists as open{
+ id: string,
+ user: TwitterUser,
+ latitude:double,
+ longitude:double,
+ created_at:string,
+ message_text:string
+ };
+
+*/
+ public static class Tweet {
+ public static final String ID = "id";
+ public static final String USER = "user";
+ public static final String LATITUDE = "latitude";
+ public static final String LONGITUDE = "longitude";
+ public static final String CREATED_AT = "created_at";
+ public static final String MESSAGE = "message_text";
+
+ public static final String COUNTRY = "country";
+
+ // User fields (for the sub record "user"
+ public static final String SCREEN_NAME = "screen_name";
+ public static final String LANGUAGE = "language";
+ public static final String FRIENDS_COUNT = "friends_count";
+ public static final String STATUS_COUNT = "status_count";
+ public static final String NAME = "name";
+ public static final String FOLLOWERS_COUNT = "followers_count";
+
}
+
+ /*
+
+ create type ProcessedTweet if not exists as open {
+ id: string,
+ user_name:string,
+ location:point,
+ created_at:string,
+ message_text:string,
+ country: string,
+ topics: [string]
+ };
+ */
public static final class ProcessedTweet {
public static final String USER_NAME = "user_name";
public static final String LOCATION = "location";
public static final String TOPICS = "topics";
}
+
+
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
index 971e6a2..c98abd5 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/TweetProcessor.java
@@ -15,14 +15,20 @@
package org.apache.asterix.external.util;
import org.apache.asterix.external.library.java.JObjectUtil;
-import twitter4j.Status;
-import twitter4j.User;
+import org.apache.asterix.external.util.Datatypes.Tweet;
import org.apache.asterix.om.base.AMutableDouble;
import org.apache.asterix.om.base.AMutableInt32;
import org.apache.asterix.om.base.AMutableRecord;
import org.apache.asterix.om.base.AMutableString;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import twitter4j.Status;
+import twitter4j.User;
+
+import java.util.HashMap;
+import java.util.Map;
public class TweetProcessor {
@@ -31,10 +37,15 @@
private AMutableRecord mutableRecord;
private AMutableRecord mutableUser;
+ private final Map<String, Integer> userFieldNameMap = new HashMap<>();
+ private final Map<String, Integer> tweetFieldNameMap = new HashMap<>();
+
+
public TweetProcessor(ARecordType recordType) {
+ initFieldNames(recordType);
mutableUserFields = new IAObject[] { new AMutableString(null), new
AMutableString(null), new AMutableInt32(0),
new AMutableInt32(0), new AMutableString(null), new
AMutableInt32(0) };
- mutableUser = new AMutableRecord((ARecordType)
recordType.getFieldTypes()[1], mutableUserFields);
+ mutableUser = new AMutableRecord((ARecordType)
recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)],
mutableUserFields);
mutableTweetFields = new IAObject[] { new AMutableString(null),
mutableUser, new AMutableDouble(0),
new AMutableDouble(0), new AMutableString(null), new
AMutableString(null) };
@@ -42,32 +53,56 @@
}
+ // Initialize the hashmap values for the field names and positions
+ private void initFieldNames(ARecordType recordType) {
+ String tweetFields[] = recordType.getFieldNames();
+ for (int i=0; i<tweetFields.length; i++) {
+ tweetFieldNameMap.put(tweetFields[i], i);
+ if (tweetFields[i].equals(Tweet.USER)) {
+ IAType fieldType = recordType.getFieldTypes()[i];
+ if (fieldType.getTypeTag() == ATypeTag.RECORD) {
+ String userFields[] =
((ARecordType)fieldType).getFieldNames();
+ for (int j=0; j<userFields.length; j++) {
+ userFieldNameMap.put(userFields[j], j);
+ }
+ }
+
+ }
+ }
+ }
+
+
public AMutableRecord processNextTweet(Status tweet) {
User user = tweet.getUser();
- ((AMutableString)
mutableUserFields[0]).setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
- ((AMutableString)
mutableUserFields[1]).setValue(JObjectUtil.getNormalizedString(user.getLang()));
- ((AMutableInt32)
mutableUserFields[2]).setValue(user.getFriendsCount());
- ((AMutableInt32)
mutableUserFields[3]).setValue(user.getStatusesCount());
- ((AMutableString)
mutableUserFields[4]).setValue(JObjectUtil.getNormalizedString(user.getName()));
- ((AMutableInt32)
mutableUserFields[5]).setValue(user.getFollowersCount());
- ((AMutableString) mutableTweetFields[0]).setValue(tweet.getId() + "");
+ // Tweet user data
+ ((AMutableString)
mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)]).setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
+ ((AMutableString)
mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)]).setValue(JObjectUtil.getNormalizedString(user.getLang()));
+ ((AMutableInt32)
mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount());
+ ((AMutableInt32)
mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount());
+ ((AMutableString)
mutableUserFields[userFieldNameMap.get(Tweet.NAME)]).setValue(JObjectUtil.getNormalizedString(user.getName()));
+ ((AMutableInt32)
mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)]).setValue(user.getFollowersCount());
- for (int i = 0; i < 6; i++) {
- ((AMutableRecord) mutableTweetFields[1]).setValueAtPos(i,
mutableUserFields[i]);
+
+ // Tweet data
+ ((AMutableString)
mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId()));
+
+ int userPos = tweetFieldNameMap.get(Tweet.USER);
+ for (int i = 0; i < mutableUserFields.length; i++) {
+ ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i,
mutableUserFields[i]);
}
if (tweet.getGeoLocation() != null) {
- ((AMutableDouble)
mutableTweetFields[2]).setValue(tweet.getGeoLocation().getLatitude());
- ((AMutableDouble)
mutableTweetFields[3]).setValue(tweet.getGeoLocation().getLongitude());
+ ((AMutableDouble)
mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(tweet.getGeoLocation().getLatitude());
+ ((AMutableDouble)
mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(tweet.getGeoLocation().getLongitude());
} else {
- ((AMutableDouble) mutableTweetFields[2]).setValue(0);
- ((AMutableDouble) mutableTweetFields[3]).setValue(0);
+ ((AMutableDouble)
mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0);
+ ((AMutableDouble)
mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0);
}
- ((AMutableString)
mutableTweetFields[4]).setValue(JObjectUtil.getNormalizedString(
+ ((AMutableString)
mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)]).setValue(JObjectUtil.getNormalizedString(
tweet.getCreatedAt().toString()));
- ((AMutableString)
mutableTweetFields[5]).setValue(JObjectUtil.getNormalizedString(tweet.getText()));
+ ((AMutableString)
mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)]).setValue(JObjectUtil.getNormalizedString(tweet.getText()));
- for (int i = 0; i < 6; i++) {
+ for (int i = 0; i < mutableTweetFields.length; i++) {
mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
}
diff --git
a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
index dc75c57..bd6518b 100644
---
a/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
+++
b/asterix-external-data/src/test/java/org/apache/asterix/external/library/AddHashTagsFunction.java
@@ -47,7 +47,10 @@
if (latitude != null && longitude != null) {
location.setValue(latitude.getValue(), longitude.getValue());
+ } else {
+ location.setValue(0, 0);
}
+
String[] tokens = text.getValue().split(" ");
for (String tk : tokens) {
if (tk.startsWith("#")) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/378
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibdb5074d0e0d4fb2b7d4303aa405c9fc90f4bd09
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Heri Ramampiaro <[email protected]>