Author: kwright
Date: Sun Jan 27 20:43:31 2013
New Revision: 1439174
URL: http://svn.apache.org/viewvc?rev=1439174&view=rev
Log:
Merge in schema change and infrastructure portions of CONNECTORS-552 changes.
Added:
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ForcedParamManager.java
- copied unchanged from r1439172,
manifoldcf/branches/CONNECTORS-552-2/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ForcedParamManager.java
Modified:
manifoldcf/trunk/ (props changed)
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/RepositoryDocument.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobDescription.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
Merged /manifoldcf/branches/CONNECTORS-552-2:r1438584-1439172
Modified:
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL:
http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1439174&r1=1439173&r2=1439174&view=diff
==============================================================================
---
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
(original)
+++
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
Sun Jan 27 20:43:31 2013
@@ -46,6 +46,7 @@ import java.io.*;
* <tr><td>urihash</td><td>VARCHAR(40)</td><td></td></tr>
* <tr><td>lastversion</td><td>LONGTEXT</td><td></td></tr>
* <tr><td>lastoutputversion</td><td>LONGTEXT</td><td></td></tr>
+* <tr><td>forcedparams</td><td>LONGTEXT</td><td></td></tr>
* <tr><td>changecount</td><td>BIGINT</td><td></td></tr>
* <tr><td>firstingest</td><td>BIGINT</td><td></td></tr>
* <tr><td>lastingest</td><td>BIGINT</td><td></td></tr>
@@ -66,6 +67,7 @@ public class IncrementalIngester extends
protected final static String uriHashField = "urihash";
protected final static String lastVersionField = "lastversion";
protected final static String lastOutputVersionField = "lastoutputversion";
+ protected final static String forcedParamsField = "forcedparams";
protected final static String changeCountField = "changecount";
protected final static String firstIngestField = "firstingest";
protected final static String lastIngestField = "lastingest";
@@ -114,6 +116,7 @@ public class IncrementalIngester extends
map.put(uriHashField,new
ColumnDescription("VARCHAR(40)",false,true,null,null,false));
map.put(lastVersionField,new
ColumnDescription("LONGTEXT",false,true,null,null,false));
map.put(lastOutputVersionField,new
ColumnDescription("LONGTEXT",false,true,null,null,false));
+ map.put(forcedParamsField,new
ColumnDescription("LONGTEXT",false,true,null,null,false));
map.put(changeCountField,new
ColumnDescription("BIGINT",false,false,null,null,false));
map.put(firstIngestField,new
ColumnDescription("BIGINT",false,false,null,null,false));
map.put(lastIngestField,new
ColumnDescription("BIGINT",false,false,null,null,false));
@@ -122,7 +125,14 @@ public class IncrementalIngester extends
}
else
{
- // This is where any schema upgrade code must go, should it be needed.
+ // Schema upgrade from 1.1 to 1.2
+ ColumnDescription cd =
(ColumnDescription)existing.get(forcedParamsField);
+ if (cd == null)
+ {
+ Map<String,ColumnDescription> addMap = new
HashMap<String,ColumnDescription>();
+ addMap.put(forcedParamsField,new
ColumnDescription("LONGTEXT",false,true,null,null,false));
+ performAlter(addMap,null,null,null);
+ }
}
// Now, do indexes
@@ -175,6 +185,7 @@ public class IncrementalIngester extends
/** Flush all knowledge of what was ingested before.
*/
+ @Override
public void clearAll()
throws ManifoldCFException
{
@@ -187,6 +198,7 @@ public class IncrementalIngester extends
*@param mimeType is the mime type to check.
*@return true if the mimeType is indexable.
*/
+ @Override
public boolean checkMimeTypeIndexable(String outputConnectionName, String
outputDescription, String mimeType)
throws ManifoldCFException, ServiceInterruption
{
@@ -211,6 +223,7 @@ public class IncrementalIngester extends
*@param localFile is the local file to check.
*@return true if the local file is indexable.
*/
+ @Override
public boolean checkDocumentIndexable(String outputConnectionName, String
outputDescription, File localFile)
throws ManifoldCFException, ServiceInterruption
{
@@ -236,6 +249,7 @@ public class IncrementalIngester extends
*@param length is the length of the document.
*@return true if the file is indexable.
*/
+ @Override
public boolean checkLengthIndexable(String outputConnectionName, String
outputDescription, long length)
throws ManifoldCFException, ServiceInterruption
{
@@ -261,6 +275,7 @@ public class IncrementalIngester extends
*@param url is the url of the document.
*@return true if the file is indexable.
*/
+ @Override
public boolean checkURLIndexable(String outputConnectionName, String
outputDescription, String url)
throws ManifoldCFException, ServiceInterruption
{
@@ -284,6 +299,7 @@ public class IncrementalIngester extends
*@param spec is the output specification.
*@return the description string.
*/
+ @Override
public String getOutputDescription(String outputConnectionName,
OutputSpecification spec)
throws ManifoldCFException, ServiceInterruption
{
@@ -313,6 +329,7 @@ public class IncrementalIngester extends
*@param recordTime is the time at which the recording took place, in
milliseconds since epoch.
*@param activities is the object used in case a document needs to be removed
from the output index as the result of this operation.
*/
+ @Override
public void documentRecord(String outputConnectionName,
String identifierClass, String identifierHash,
String documentVersion,
@@ -328,7 +345,7 @@ public class IncrementalIngester extends
Logging.ingest.debug("Recording document '"+docKey+"' for output
connection '"+outputConnectionName+"'");
}
-
performIngestion(connection,docKey,documentVersion,null,null,null,recordTime,null,activities);
+
performIngestion(connection,docKey,documentVersion,null,null,null,null,recordTime,null,activities);
}
/** Ingest a document.
@@ -348,6 +365,7 @@ public class IncrementalIngester extends
*@param activities is an object providing a set of methods that the
implementer can use to perform the operation.
*@return true if the ingest was ok, false if the ingest is illegal (and
should not be repeated).
*/
+ @Override
public boolean documentIngest(String outputConnectionName,
String identifierClass, String identifierHash,
String documentVersion,
@@ -358,6 +376,48 @@ public class IncrementalIngester extends
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
+ return documentIngest(outputConnectionName,
+ identifierClass,
+ identifierHash,
+ documentVersion,
+ outputVersion,
+ null,
+ authorityName,
+ data,
+ ingestTime,
+ documentURI,
+ activities);
+ }
+
+ /** Ingest a document.
+ * This ingests the document, and notes it. If this is a repeat ingestion of
the document, this
+ * method also REMOVES ALL OLD METADATA. When complete, the index will
contain only the metadata
+ * described by the RepositoryDocument object passed to this method.
+ * ServiceInterruption is thrown if the document ingestion must be
rescheduled.
+ *@param outputConnectionName is the name of the output connection associated
with this action.
+ *@param identifierClass is the name of the space in which the identifier
hash should be interpreted.
+ *@param identifierHash is the hashed document identifier.
+ *@param documentVersion is the document version.
+ *@param parameterVersion is the forced parameter version.
+ *@param outputVersion is the output version string constructed from the
output specification by the output connector.
+ *@param authorityName is the name of the authority associated with the
document, if any.
+ *@param data is the document data. The data is closed after ingestion is
complete.
+ *@param ingestTime is the time at which the ingestion took place, in
milliseconds since epoch.
+ *@param documentURI is the URI of the document, which will be used as the
key of the document in the index.
+ *@param activities is an object providing a set of methods that the
implementer can use to perform the operation.
+ *@return true if the ingest was ok, false if the ingest is illegal (and
should not be repeated).
+ */
+ public boolean documentIngest(String outputConnectionName,
+ String identifierClass, String identifierHash,
+ String documentVersion,
+ String outputVersion,
+ String parameterVersion,
+ String authorityName,
+ RepositoryDocument data,
+ long ingestTime, String documentURI,
+ IOutputActivity activities)
+ throws ManifoldCFException, ServiceInterruption
+ {
IOutputConnection connection =
connectionManager.load(outputConnectionName);
String docKey = makeKey(identifierClass,identifierHash);
@@ -367,13 +427,14 @@ public class IncrementalIngester extends
Logging.ingest.debug("Ingesting document '"+docKey+"' into output
connection '"+outputConnectionName+"'");
}
- return
performIngestion(connection,docKey,documentVersion,outputVersion,authorityName,
+ return
performIngestion(connection,docKey,documentVersion,outputVersion,parameterVersion,authorityName,
data,ingestTime,documentURI,activities);
}
+
/** Do the actual ingestion, or just record it if there's nothing to ingest.
*/
protected boolean performIngestion(IOutputConnection connection,
- String docKey, String documentVersion, String outputVersion,
+ String docKey, String documentVersion, String outputVersion, String
parameterVersion,
String authorityNameString,
RepositoryDocument data,
long ingestTime, String documentURI,
@@ -500,15 +561,15 @@ public class IncrementalIngester extends
// This is a marker that says "something is there"; it has an empty
version, which indicates
// that we don't know anything about it. That means it will be
reingested when the
// next version comes along, and will be deleted if called for also.
-
noteDocumentIngest(connection.getName(),docKey,null,null,null,ingestTime,documentURI,documentURIHash);
+
noteDocumentIngest(connection.getName(),docKey,null,null,null,null,ingestTime,documentURI,documentURIHash);
int result =
addOrReplaceDocument(connection,documentURI,outputVersion,data,authorityNameString,activities);
-
noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
+
noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
return result == IOutputConnector.DOCUMENTSTATUS_ACCEPTED;
}
// If we get here, it means we are noting that the document was
examined, but that no change was required. This is signaled
// to noteDocumentIngest by having the null documentURI.
-
noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,authorityNameString,ingestTime,null,null);
+
noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,null,null);
return true;
}
finally
@@ -524,6 +585,7 @@ public class IncrementalIngester extends
*@param identifierHashes are the set of document identifier hashes.
*@param checkTime is the time at which the check took place, in milliseconds
since epoch.
*/
+ @Override
public void documentCheckMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes,
long checkTime)
@@ -643,6 +705,7 @@ public class IncrementalIngester extends
*@param identifierHashes is tha array of document identifier hashes if the
documents.
*@param activities is the object to use to log the details of the ingestion
attempt. May be null.
*/
+ @Override
public void documentDeleteMultiple(String[] outputConnectionNames,
String[] identifierClasses, String[] identifierHashes,
IOutputRemoveActivity activities)
@@ -690,6 +753,7 @@ public class IncrementalIngester extends
*@param identifierHashes is tha array of document identifier hashes if the
documents.
*@param activities is the object to use to log the details of the ingestion
attempt. May be null.
*/
+ @Override
public void documentDeleteMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes,
IOutputRemoveActivity activities)
@@ -987,6 +1051,7 @@ public class IncrementalIngester extends
*@param identifierHash is the hash of the id of the document.
*@param activities is the object to use to log the details of the ingestion
attempt. May be null.
*/
+ @Override
public void documentDelete(String outputConnectionName,
String identifierClass, String identifierHash,
IOutputRemoveActivity activities)
@@ -1058,6 +1123,7 @@ public class IncrementalIngester extends
*@return the array of document data. Null will come back for any identifier
that doesn't
* exist in the index.
*/
+ @Override
public DocumentIngestStatus[] getDocumentIngestDataMultiple(String[]
outputConnectionNames,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
@@ -1114,6 +1180,7 @@ public class IncrementalIngester extends
*@return the array of document data. Null will come back for any identifier
that doesn't
* exist in the index.
*/
+ @Override
public DocumentIngestStatus[] getDocumentIngestDataMultiple(String
outputConnectionName,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
@@ -1175,6 +1242,7 @@ public class IncrementalIngester extends
*@param identifierHash is the hash of the id of the document.
*@return the current document's ingestion data, or null if the document is
not currently ingested.
*/
+ @Override
public DocumentIngestStatus getDocumentIngestData(String
outputConnectionName,
String identifierClass, String identifierHash)
throws ManifoldCFException
@@ -1189,6 +1257,7 @@ public class IncrementalIngester extends
*@param identifierHash is the hash of the id of the document.
*@return the number of milliseconds between changes, or 0 if this cannot be
calculated.
*/
+ @Override
public long getDocumentUpdateInterval(String outputConnectionName,
String identifierClass, String identifierHash)
throws ManifoldCFException
@@ -1203,6 +1272,7 @@ public class IncrementalIngester extends
*@param identifierHashes is the hashes of the ids of the documents.
*@return the number of milliseconds between changes, or 0 if this cannot be
calculated.
*/
+ @Override
public long[] getDocumentUpdateIntervalMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
@@ -1316,6 +1386,7 @@ public class IncrementalIngester extends
*@param documentVersion is a string describing the new version of the
document.
*@param outputVersion is the version string calculated for the output
connection.
*@param authorityNameString is the name of the relevant authority connection.
+ *@param packedForcedParameters is the string we use to determine differences
in packed parameters.
*@param ingestTime is the time at which the ingestion took place, in
milliseconds since epoch.
*@param documentURI is the uri the document can be accessed at, or null
(which signals that we are to record the version, but no
* ingestion took place).
@@ -1323,7 +1394,8 @@ public class IncrementalIngester extends
*/
protected void noteDocumentIngest(String outputConnectionName,
String docKey, String documentVersion,
- String outputVersion, String authorityNameString,
+ String outputVersion, String packedForcedParameters,
+ String authorityNameString,
long ingestTime, String documentURI, String documentURIHash)
throws ManifoldCFException
{
@@ -1352,6 +1424,7 @@ public class IncrementalIngester extends
map.clear();
map.put(lastVersionField,documentVersion);
map.put(lastOutputVersionField,outputVersion);
+ map.put(forcedParamsField,packedForcedParameters);
map.put(lastIngestField,new Long(ingestTime));
if (documentURI != null)
{
@@ -1362,7 +1435,7 @@ public class IncrementalIngester extends
map.put(authorityNameField,authorityNameString);
else
map.put(authorityNameField,"");
-
+
// Transaction abort due to deadlock should be retried here.
while (true)
{
@@ -1429,6 +1502,7 @@ public class IncrementalIngester extends
map.clear();
map.put(lastVersionField,documentVersion);
map.put(lastOutputVersionField,outputVersion);
+ map.put(forcedParamsField,packedForcedParameters);
map.put(lastIngestField,new Long(ingestTime));
if (documentURI != null)
{
@@ -1542,8 +1616,8 @@ public class IncrementalIngester extends
new UnitaryClause(outputConnNameField,outputConnectionName)});
// Get the primary records associated with this hash value
- IResultSet set = performQuery("SELECT
"+idField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+"
FROM "+getTableName()+" WHERE "+
- query,newList,null,null);
+ IResultSet set = performQuery("SELECT
"+idField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+
+ " FROM "+getTableName()+" WHERE "+query,newList,null,null);
// Now, go through the original request once more, this time building the
result
int i = 0;
@@ -1558,7 +1632,8 @@ public class IncrementalIngester extends
String lastVersion = (String)row.getValue(lastVersionField);
String lastOutputVersion =
(String)row.getValue(lastOutputVersionField);
String authorityName = (String)row.getValue(authorityNameField);
- rval[position.intValue()] = new
DocumentIngestStatus(lastVersion,lastOutputVersion,authorityName);
+ String paramVersion = (String)row.getValue(forcedParamsField);
+ rval[position.intValue()] = new
DocumentIngestStatus(lastVersion,lastOutputVersion,authorityName,paramVersion);
}
}
}
@@ -1567,7 +1642,9 @@ public class IncrementalIngester extends
/** Add or replace document, using the specified output connection, via the
standard pool.
*/
- protected int addOrReplaceDocument(IOutputConnection connection, String
documentURI, String outputDescription, RepositoryDocument document, String
authorityNameString, IOutputAddActivity activities)
+ protected int addOrReplaceDocument(IOutputConnection connection, String
documentURI, String outputDescription,
+ RepositoryDocument document, String authorityNameString,
+ IOutputAddActivity activities)
throws ManifoldCFException, ServiceInterruption
{
IOutputConnector connector =
OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
Modified:
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java
URL:
http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java?rev=1439174&r1=1439173&r2=1439174&view=diff
==============================================================================
---
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java
(original)
+++
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java
Sun Jan 27 20:43:31 2013
@@ -25,22 +25,25 @@ import java.util.*;
* - version string
* - output version string
* - authority name
+* - parameter version string
*/
public class DocumentIngestStatus
{
public static final String _rcsid = "@(#)$Id: DocumentIngestStatus.java
988245 2010-08-23 18:39:35Z kwright $";
- protected String documentVersionString;
- protected String outputVersionString;
- protected String documentAuthorityNameString;
+ protected final String documentVersionString;
+ protected final String outputVersionString;
+ protected final String documentAuthorityNameString;
+ protected final String parameterVersionString;
/** Constructor */
public DocumentIngestStatus(String documentVersionString, String
outputVersionString,
- String documentAuthorityNameString)
+ String documentAuthorityNameString, String parameterVersionString)
{
this.documentVersionString = documentVersionString;
this.outputVersionString = outputVersionString;
this.documentAuthorityNameString = documentAuthorityNameString;
+ this.parameterVersionString = parameterVersionString;
}
/** Get the document version */
@@ -60,4 +63,11 @@ public class DocumentIngestStatus
{
return documentAuthorityNameString;
}
+
+ /** Get the parameter version string */
+ public String getParameterVersion()
+ {
+ return parameterVersionString;
+ }
+
}
Modified:
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL:
http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1439174&r1=1439173&r2=1439174&view=diff
==============================================================================
---
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
(original)
+++
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
Sun Jan 27 20:43:31 2013
@@ -20,6 +20,7 @@ package org.apache.manifoldcf.agents.int
import org.apache.manifoldcf.core.interfaces.*;
import java.io.*;
+import java.util.*;
/** This interface describes the incremental ingestion API.
* SOME NOTES:
@@ -148,6 +149,35 @@ public interface IIncrementalIngester
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption;
+ /** Ingest a document.
+ * This ingests the document, and notes it. If this is a repeat ingestion of
the document, this
+ * method also REMOVES ALL OLD METADATA. When complete, the index will
contain only the metadata
+ * described by the RepositoryDocument object passed to this method.
+ * ServiceInterruption is thrown if the document ingestion must be
rescheduled.
+ *@param outputConnectionName is the name of the output connection associated
with this action.
+ *@param identifierClass is the name of the space in which the identifier
hash should be interpreted.
+ *@param identifierHash is the hashed document identifier.
+ *@param documentVersion is the document version.
+ *@param parameterVersion is the forced parameter version.
+ *@param outputVersion is the output version string constructed from the
output specification by the output connector.
+ *@param authorityName is the name of the authority associated with the
document, if any.
+ *@param data is the document data. The data is closed after ingestion is
complete.
+ *@param ingestTime is the time at which the ingestion took place, in
milliseconds since epoch.
+ *@param documentURI is the URI of the document, which will be used as the
key of the document in the index.
+ *@param activities is an object providing a set of methods that the
implementer can use to perform the operation.
+ *@return true if the ingest was ok, false if the ingest is illegal (and
should not be repeated).
+ */
+ public boolean documentIngest(String outputConnectionName,
+ String identifierClass, String identifierHash,
+ String documentVersion,
+ String outputVersion,
+ String parameterVersion,
+ String authorityName,
+ RepositoryDocument data,
+ long ingestTime, String documentURI,
+ IOutputActivity activities)
+ throws ManifoldCFException, ServiceInterruption;
+
/** Note the fact that we checked a document (and found that it did not need
to be ingested, because the
* versions agreed).
*@param outputConnectionName is the name of the output connection associated
with this action.
Modified:
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/RepositoryDocument.java
URL:
http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/RepositoryDocument.java?rev=1439174&r1=1439173&r2=1439174&view=diff
==============================================================================
---
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/RepositoryDocument.java
(original)
+++
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/RepositoryDocument.java
Sun Jan 27 20:43:31 2013
@@ -233,7 +233,7 @@ public class RepositoryDocument
addField(fieldName,new Reader[]{fieldData});
}
- /** Remove a multivalue character field.
+ /** Add/Remove a multivalue character field.
*@param fieldName is the field name.
*@param fieldData is the multi-valued data (as a an array of Strings). Null
means
* to remove the entry from the document.
Modified:
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobDescription.java
URL:
http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobDescription.java?rev=1439174&r1=1439173&r2=1439174&view=diff
==============================================================================
---
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobDescription.java
(original)
+++
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobDescription.java
Sun Jan 27 20:43:31 2013
@@ -225,4 +225,19 @@ public interface IJobDescription
/** Set the hopcount mode. */
public void setHopcountMode(int mode);
+ // Forced metadata
+
+ /** Get the forced metadata.
+ *@return the set as a map, keyed by metadata name, with value a set of
strings.
+ */
+ public Map<String,Set<String>> getForcedMetadata();
+
+ /** Clear forced metadata.
+ */
+ public void clearForcedMetadata();
+
+ /** Add a forced metadata name/value pair.
+ */
+ public void addForcedMetadataValue(String name, String value);
+
}
Modified:
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java
URL:
http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java?rev=1439174&r1=1439173&r2=1439174&view=diff
==============================================================================
---
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java
(original)
+++
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java
Sun Jan 27 20:43:31 2013
@@ -75,6 +75,9 @@ public class JobDescription implements I
// Hopcount mode
protected int hopcountMode = HOPCOUNT_ACCURATE;
+ // Forced metadata
+ protected Map<String,Set<String>> forcedMetadata = new
HashMap<String,Set<String>>();
+
// Read-only mode
protected boolean readOnly = false;
@@ -109,6 +112,14 @@ public class JobDescription implements I
Long maxHops = (Long)hopCountFilters.get(linkType);
rval.hopCountFilters.put(linkType,maxHops);
}
+ for (String forcedParamName : forcedMetadata.keySet())
+ {
+ Set<String> values = forcedMetadata.get(forcedParamName);
+ for (String value : values)
+ {
+ rval.addForcedMetadataValue(forcedParamName,value);
+ }
+ }
// Direct modification of this object is possible - so it also has to know
if it is read-only!!
rval.outputSpecification = outputSpecification.duplicate(readOnly);
// Direct modification of this object is possible - so it also has to know
if it is read-only!!
@@ -444,4 +455,34 @@ public class JobDescription implements I
hopcountMode = mode;
}
+ // Forced metadata
+
+ /** Get the forced metadata.
+ *@return the set as a map, keyed by metadata name, with value a set of
strings.
+ */
+ public Map<String,Set<String>> getForcedMetadata()
+ {
+ return forcedMetadata;
+ }
+
+ /** Clear forced metadata.
+ */
+ public void clearForcedMetadata()
+ {
+ forcedMetadata.clear();
+ }
+
+ /** Add a forced metadata name/value pair.
+ */
+ public void addForcedMetadataValue(String name, String value)
+ {
+ Set<String> rval = forcedMetadata.get(name);
+ if (rval == null)
+ {
+ rval = new HashSet<String>();
+ forcedMetadata.put(name,rval);
+ }
+ rval.add(value);
+ }
+
}
Modified:
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL:
http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1439174&r1=1439173&r2=1439174&view=diff
==============================================================================
---
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
(original)
+++
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
Sun Jan 27 20:43:31 2013
@@ -246,6 +246,7 @@ public class Jobs extends org.apache.man
protected ICacheManager cacheManager;
protected ScheduleManager scheduleManager;
protected HopFilterManager hopFilterManager;
+ protected ForcedParamManager forcedParamManager;
protected IOutputConnectionManager outputMgr;
protected IRepositoryConnectionManager connectionMgr;
@@ -262,6 +263,8 @@ public class Jobs extends org.apache.man
this.threadContext = threadContext;
scheduleManager = new ScheduleManager(threadContext,database);
hopFilterManager = new HopFilterManager(threadContext,database);
+ forcedParamManager = new ForcedParamManager(threadContext,database);
+
cacheManager = CacheManagerFactory.make(threadContext);
outputMgr = OutputConnectionManagerFactory.make(threadContext);
@@ -311,6 +314,7 @@ public class Jobs extends org.apache.man
// Handle related tables
scheduleManager.install(getTableName(),idField);
hopFilterManager.install(getTableName(),idField);
+ forcedParamManager.install(getTableName(),idField);
// Index management
IndexDescription statusIndex = new IndexDescription(false,new
String[]{statusField,idField,priorityField});
@@ -357,6 +361,7 @@ public class Jobs extends org.apache.man
beginTransaction();
try
{
+ forcedParamManager.deinstall();
hopFilterManager.deinstall();
scheduleManager.deinstall();
performDrop(null);
@@ -602,6 +607,7 @@ public class Jobs extends org.apache.man
{
scheduleManager.deleteRows(id);
hopFilterManager.deleteRows(id);
+ forcedParamManager.deleteRows(id);
ArrayList params = new ArrayList();
String query = buildConjunctionClause(params,new ClauseDescription[]{
new UnitaryClause(idField,id)});
@@ -743,6 +749,7 @@ public class Jobs extends org.apache.man
performUpdate(values," WHERE "+query,params,null);
scheduleManager.deleteRows(id);
hopFilterManager.deleteRows(id);
+ forcedParamManager.deleteRows(id);
}
else
{
@@ -760,7 +767,9 @@ public class Jobs extends org.apache.man
scheduleManager.writeRows(id,jobDescription);
// Write hop filter rows
hopFilterManager.writeRows(id,jobDescription);
-
+ // Write forced params
+ forcedParamManager.writeRows(id,jobDescription);
+
cacheManager.invalidateKeys(ch);
break;
}
@@ -2639,6 +2648,7 @@ public class Jobs extends org.apache.man
// Fill in schedules for jobs
scheduleManager.getRows(returnValues,idList,params);
hopFilterManager.getRows(returnValues,idList,params);
+ forcedParamManager.getRows(returnValues,idList,params);
}
catch (NumberFormatException e)
{
Modified:
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL:
http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1439174&r1=1439173&r2=1439174&view=diff
==============================================================================
---
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
(original)
+++
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Sun Jan 27 20:43:31 2013
@@ -137,6 +137,7 @@ public class WorkerThread extends Thread
// Universal data, from the job
String connectionName = job.getConnectionName();
String outputName = job.getOutputConnectionName();
+ String newParameterVersion =
packParameters(job.getForcedMetadata());
DocumentSpecification spec = job.getSpecification();
OutputSpecification outputSpec = job.getOutputSpecification();
int jobType = job.getType();
@@ -447,6 +448,9 @@ public class WorkerThread extends Thread
String oldOutputVersion =
oldDocStatus.getOutputVersion();
if (oldOutputVersion == null)
oldOutputVersion = "";
+ String oldParameterVersion =
oldDocStatus.getParameterVersion();
+ if (oldParameterVersion == null)
+ oldParameterVersion = "";
// Start the comparison processing
if (newDocVersion.length() == 0)
@@ -456,7 +460,8 @@ public class WorkerThread extends Thread
}
else if (oldDocVersion.equals(newDocVersion) &&
oldAuthorityName.equals(newAuthorityName) &&
- oldOutputVersion.equals(newOutputVersion))
+ oldOutputVersion.equals(newOutputVersion) &&
+
oldParameterVersion.equals(newParameterVersion))
{
// The old logic was as follows:
//
@@ -515,7 +520,7 @@ public class WorkerThread extends Thread
// First, make the things we will need for all
subsequent steps.
ProcessActivity activity = new
ProcessActivity(threadContext,queueTracker,jobManager,ingester,
-
currentTime,job,connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,outputVersion);
+
currentTime,job,connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,outputVersion,newParameterVersion);
try
{
@@ -1164,6 +1169,48 @@ public class WorkerThread extends Thread
}
}
+ protected static String packParameters(Map<String,Set<String>>
forcedParameters)
+ {
+ StringBuilder sb = new StringBuilder();
+ String[] paramNames = new String[forcedParameters.size()];
+ int i = 0;
+ for (String paramName : forcedParameters.keySet())
+ {
+ paramNames[i++] = paramName;
+ }
+ java.util.Arrays.sort(paramNames);
+ for (String paramName : paramNames)
+ {
+ Set<String> values = forcedParameters.get(paramName);
+ String[] paramValues = new String[values.size()];
+ i = 0;
+ for (String paramValue : values)
+ {
+ paramValues[i++] = paramValue;
+ }
+ java.util.Arrays.sort(paramValues);
+ for (String paramValue : paramValues)
+ {
+ pack(sb,paramName,'+');
+ pack(sb,paramValue,'+');
+ }
+ }
+ return sb.toString();
+ }
+
+ protected static void pack(StringBuilder sb, String value, char delim)
+ {
+ for (int i = 0; i < value.length(); i++)
+ {
+ char x = value.charAt(i);
+ if (x == delim || x == '\\')
+ {
+ sb.append('\\');
+ }
+ sb.append(x);
+ }
+ sb.append(delim);
+ }
/** The maximum number of adds that happen in a single transaction */
protected static final int MAX_ADDS_IN_TRANSACTION = 20;
@@ -1370,21 +1417,21 @@ public class WorkerThread extends Thread
protected static class ProcessActivity implements IProcessActivity
{
// Member variables
- protected IThreadContext threadContext;
- protected IJobManager jobManager;
- protected IIncrementalIngester ingester;
- protected boolean ingestAllowed;
- protected long currentTime;
- protected IJobDescription job;
- protected IRepositoryConnection connection;
- protected IRepositoryConnector connector;
- protected IRepositoryConnectionManager connMgr;
- protected String[] legalLinkTypes;
- protected OutputActivity ingestLogger;
- protected QueueTracker queueTracker;
- protected HashMap abortSet;
- protected String outputVersion;
-
+ protected final IThreadContext threadContext;
+ protected final IJobManager jobManager;
+ protected final IIncrementalIngester ingester;
+ protected final long currentTime;
+ protected final IJobDescription job;
+ protected final IRepositoryConnection connection;
+ protected final IRepositoryConnector connector;
+ protected final IRepositoryConnectionManager connMgr;
+ protected final String[] legalLinkTypes;
+ protected final OutputActivity ingestLogger;
+ protected final QueueTracker queueTracker;
+ protected final HashMap abortSet;
+ protected final String outputVersion;
+ protected final String parameterVersion;
+
// We submit references in bulk, because that's way more efficient.
protected HashMap referenceList = new HashMap();
@@ -1403,7 +1450,7 @@ public class WorkerThread extends Thread
*/
public ProcessActivity(IThreadContext threadContext, QueueTracker
queueTracker, IJobManager jobManager, IIncrementalIngester ingester,
long currentTime, IJobDescription job, IRepositoryConnection connection,
IRepositoryConnector connector, IRepositoryConnectionManager connMgr,
- String[] legalLinkTypes, OutputActivity ingestLogger, HashMap abortSet,
String outputVersion)
+ String[] legalLinkTypes, OutputActivity ingestLogger, HashMap abortSet,
String outputVersion, String parameterVersion)
{
this.threadContext = threadContext;
this.queueTracker = queueTracker;
@@ -1418,6 +1465,7 @@ public class WorkerThread extends Thread
this.ingestLogger = ingestLogger;
this.abortSet = abortSet;
this.outputVersion = outputVersion;
+ this.parameterVersion = parameterVersion;
}
/** Clean up any dangling information, before abandoning this process
activity object */
@@ -1447,6 +1495,7 @@ public class WorkerThread extends Thread
*@param originationTime is the time, in ms since epoch, that the document
originated. Pass null if none or unknown.
*@param prereqEventNames are the names of the prerequisite events which
this document requires prior to processing. Pass null if none.
*/
+ @Override
public void addDocumentReference(String localIdentifier, String
parentIdentifier, String relationshipType,
String[] dataNames, Object[][] dataValues, Long originationTime,
String[] prereqEventNames)
throws ManifoldCFException
@@ -1545,6 +1594,7 @@ public class WorkerThread extends Thread
*@param dataValues are the values that correspond to the data names in the
dataNames parameter. May be null only if dataNames is null.
*@param originationTime is the time, in ms since epoch, that the document
originated. Pass null if none or unknown.
*/
+ @Override
public void addDocumentReference(String localIdentifier, String
parentIdentifier, String relationshipType,
String[] dataNames, Object[][] dataValues, Long originationTime)
throws ManifoldCFException
@@ -1563,6 +1613,7 @@ public class WorkerThread extends Thread
*@param dataNames is the list of carry-down data from the parent to the
child. May be null. Each name is limited to 255 characters!
*@param dataValues are the values that correspond to the data names in the
dataNames parameter. May be null only if dataNames is null.
*/
+ @Override
public void addDocumentReference(String localIdentifier, String
parentIdentifier, String relationshipType,
String[] dataNames, Object[][] dataValues)
throws ManifoldCFException
@@ -1579,6 +1630,7 @@ public class WorkerThread extends Thread
* reference. This must be one of the strings returned by the
IRepositoryConnector method
* "getRelationshipTypes()". May be null.
*/
+ @Override
public void addDocumentReference(String localIdentifier, String
parentIdentifier, String relationshipType)
throws ManifoldCFException
{
@@ -1590,6 +1642,7 @@ public class WorkerThread extends Thread
*@param localIdentifier is the local document identifier to add (for the
connector that
* fetched the document).
*/
+ @Override
public void addDocumentReference(String localIdentifier)
throws ManifoldCFException
{
@@ -1601,6 +1654,7 @@ public class WorkerThread extends Thread
*@param dataName is the name of the data items to retrieve.
*@return an array containing the unique data values passed from ALL
parents. Note that these are in no particular order, and there will not be any
duplicates.
*/
+ @Override
public String[] retrieveParentData(String localIdentifier, String dataName)
throws ManifoldCFException
{
@@ -1612,6 +1666,7 @@ public class WorkerThread extends Thread
*@param dataName is the name of the data items to retrieve.
*@return an array containing the unique data values passed from ALL
parents. Note that these are in no particular order, and there will not be any
duplicates.
*/
+ @Override
public CharacterInput[] retrieveParentDataAsFiles(String localIdentifier,
String dataName)
throws ManifoldCFException
{
@@ -1623,6 +1678,7 @@ public class WorkerThread extends Thread
*@param documentIdentifier is the document identifier.
*@param version is the document version.
*/
+ @Override
public void recordDocument(String documentIdentifier, String version)
throws ManifoldCFException, ServiceInterruption
{
@@ -1638,6 +1694,7 @@ public class WorkerThread extends Thread
* also the unique key in the index).
*@param data is the document data. The data is closed after ingestion is
complete.
*/
+ @Override
public void ingestDocument(String documentIdentifier, String version,
String documentURI, RepositoryDocument data)
throws ManifoldCFException, ServiceInterruption
{
@@ -1647,10 +1704,25 @@ public class WorkerThread extends Thread
String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
+ Map<String,Set<String>> forcedMetadata = job.getForcedMetadata();
+
+ // Modify the repository document with forced parameters.
+ for (String paramName : forcedMetadata.keySet())
+ {
+ Set<String> values = forcedMetadata.get(paramName);
+ String[] paramValues = new String[values.size()];
+ int j = 0;
+ for (String value : values)
+ {
+ paramValues[j++] = value;
+ }
+ data.addField(paramName,paramValues);
+ }
+
// First, we need to add into the metadata the stuff from the job
description.
ingester.documentIngest(job.getOutputConnectionName(),
job.getConnectionName(),documentIdentifierHash,
- version,outputVersion,
+ version,outputVersion,parameterVersion,
connection.getACLAuthority(),
data,currentTime,
documentURI,
@@ -1663,6 +1735,7 @@ public class WorkerThread extends Thread
*@param version is the version of the document, as reported by the
getDocumentVersions() method of the
* corresponding repository connector.
*/
+ @Override
public void deleteDocument(String documentIdentifier, String version)
throws ManifoldCFException, ServiceInterruption
{
@@ -1672,12 +1745,13 @@ public class WorkerThread extends Thread
ingestDocument(documentIdentifier,version,null,null);
}
- /** Delete the current document from the search engine index. This method
does NOT keep track of version
- * information for the document and thus can lead to "churn", whereby the
same document is queued, versioned,
- * and removed on subsequent crawls. It therefore should be considered to be
deprecated, in favor of
- * deleteDocument(String localIdentifier, String version).
- *@param documentIdentifier is the document's local identifier.
- */
+ /** Delete the current document from the search engine index. This method
does NOT keep track of version
+ * information for the document and thus can lead to "churn", whereby the
same document is queued, versioned,
+ * and removed on subsequent crawls. It therefore should be considered to
be deprecated, in favor of
+ * deleteDocument(String localIdentifier, String version).
+ *@param documentIdentifier is the document's local identifier.
+ */
+ @Override
public void deleteDocument(String documentIdentifier)
throws ManifoldCFException, ServiceInterruption
{
@@ -1697,6 +1771,7 @@ public class WorkerThread extends Thread
*@param lowerExpireBoundTime is the time in ms since epoch that the expire
time should not fall BELOW, or null if none.
*@param upperExpireBoundTime is the time in ms since epoch that the expire
time should not rise ABOVE, or null if none.
*/
+ @Override
public void setDocumentScheduleBounds(String localIdentifier,
Long lowerRecrawlBoundTime, Long upperRecrawlBoundTime,
Long lowerExpireBoundTime, Long upperExpireBoundTime)
@@ -1725,6 +1800,7 @@ public class WorkerThread extends Thread
*@param localIdentifier is the document's local identifier.
*@param originationTime is the document's origination time, or null if
unknown.
*/
+ @Override
public void setDocumentOriginationTime(String localIdentifier,
Long originationTime)
throws ManifoldCFException
@@ -1846,6 +1922,7 @@ public class WorkerThread extends Thread
* described in the resultCode field. This field is not meant to be
queried on. May be null.
*@param childIdentifiers is a set of child entity identifiers associated
with this activity. May be null.
*/
+ @Override
public void recordActivity(Long startTime, String activityType, Long
dataSize,
String entityIdentifier, String resultCode, String resultDescription,
String[] childIdentifiers)
throws ManifoldCFException
@@ -1953,6 +2030,7 @@ public class WorkerThread extends Thread
* itself being aborted. If the connector should abort, this method will
raise a properly-formed ServiceInterruption, which if thrown to the
* caller, will signal that the current processing activity remains
incomplete and must be retried when the job is resumed.
*/
+ @Override
public void checkJobStillActive()
throws ManifoldCFException, ServiceInterruption
{
@@ -1967,6 +2045,7 @@ public class WorkerThread extends Thread
*@param eventName is the event name.
*@return false if the event is already in the "pending" state.
*/
+ @Override
public boolean beginEventSequence(String eventName)
throws ManifoldCFException
{
@@ -1980,6 +2059,7 @@ public class WorkerThread extends Thread
* the sole right to complete it. Otherwise, race conditions can develop
which would be difficult to diagnose.
*@param eventName is the event name.
*/
+ @Override
public void completeEventSequence(String eventName)
throws ManifoldCFException
{
@@ -1992,6 +2072,7 @@ public class WorkerThread extends Thread
* presumed that the reason for the requeue is because of sequencing issues
synchronized around an underlying event.
*@param localIdentifier is the document identifier to requeue
*/
+ @Override
public void retryDocumentProcessing(String localIdentifier)
throws ManifoldCFException
{
@@ -2003,6 +2084,7 @@ public class WorkerThread extends Thread
*@param mimeType is the mime type to check, not including any character
set specification.
*@return true if the mime type is indexable.
*/
+ @Override
public boolean checkMimeTypeIndexable(String mimeType)
throws ManifoldCFException, ServiceInterruption
{
@@ -2013,6 +2095,7 @@ public class WorkerThread extends Thread
*@param localFile is the local copy of the file to check.
*@return true if the document is indexable.
*/
+ @Override
public boolean checkDocumentIndexable(File localFile)
throws ManifoldCFException, ServiceInterruption
{
@@ -2023,6 +2106,7 @@ public class WorkerThread extends Thread
*@param length is the length to check.
*@return true if the document is indexable.
*/
+ @Override
public boolean checkLengthIndexable(long length)
throws ManifoldCFException, ServiceInterruption
{
@@ -2034,6 +2118,7 @@ public class WorkerThread extends Thread
*@param url is the URL of the document.
*@return true if the file is indexable.
*/
+ @Override
public boolean checkURLIndexable(String url)
throws ManifoldCFException, ServiceInterruption
{
@@ -2044,6 +2129,7 @@ public class WorkerThread extends Thread
*@param simpleString is the simple string.
*@return a global string.
*/
+ @Override
public String createGlobalString(String simpleString)
{
return ManifoldCF.createGlobalString(simpleString);
@@ -2053,6 +2139,7 @@ public class WorkerThread extends Thread
*@param simpleString is the simple string.
*@return a connection-specific string.
*/
+ @Override
public String createConnectionSpecificString(String simpleString)
{
return
ManifoldCF.createConnectionSpecificString(connection.getName(),simpleString);
@@ -2062,6 +2149,7 @@ public class WorkerThread extends Thread
*@param simpleString is the simple string.
*@return a job-specific string.
*/
+ @Override
public String createJobSpecificString(String simpleString)
{
return ManifoldCF.createJobSpecificString(job.getID(),simpleString);